[GOBBLIN-298] Add metric and event reporters that emit using a KafkaProducer
Closes #2153 from htran1/metrics09 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ee770f5c Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ee770f5c Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ee770f5c Branch: refs/heads/master Commit: ee770f5c5aeec469d7d93c016ce0a25200932eb2 Parents: 90d8495 Author: Hung Tran <[email protected]> Authored: Fri Oct 27 12:00:41 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Fri Oct 27 12:00:41 2017 -0700 ---------------------------------------------------------------------- .../gobblin/metrics/KafkaReportingFormats.java | 83 ------- .../metrics/kafka/KafkaAvroEventReporter.java | 122 ---------- .../metrics/kafka/KafkaAvroReporter.java | 105 -------- .../metrics/kafka/KafkaEventReporter.java | 151 ------------ .../metrics/kafka/KafkaProducerPusher.java | 91 +++++++ .../gobblin/metrics/kafka/KafkaPusher.java | 3 +- .../gobblin/metrics/kafka/KafkaReporter.java | 147 ----------- .../metrics/kafka/KafkaReporterFactory.java | 104 -------- .../reporter/KafkaAvroEventReporterTest.java | 50 ---- .../metrics/reporter/KafkaAvroReporterTest.java | 68 ------ .../reporter/KafkaEventReporterTest.java | 150 ------------ .../metrics/reporter/KafkaReporterTest.java | 242 ------------------- .../metrics/kafka/KafkaProducerPusher.java | 91 +++++++ .../reporter/KafkaProducerPusherTest.java | 91 +++++++ .../gobblin/metrics/KafkaReportingFormats.java | 83 +++++++ .../metrics/kafka/KafkaAvroEventReporter.java | 122 ++++++++++ .../metrics/kafka/KafkaAvroReporter.java | 105 ++++++++ .../metrics/kafka/KafkaAvroSchemaRegistry.java | 2 +- .../metrics/kafka/KafkaEventReporter.java | 170 +++++++++++++ .../gobblin/metrics/kafka/KafkaReporter.java | 150 ++++++++++++ .../metrics/kafka/KafkaReporterFactory.java | 113 +++++++++ .../apache/gobblin/metrics/kafka/Pusher.java | 33 +++ .../gobblin/metrics/kafka/PusherUtils.java | 47 ++++ .../reporter/KafkaAvroEventReporterTest.java | 50 ++++ .../metrics/reporter/KafkaAvroReporterTest.java | 67 +++++ .../reporter/KafkaEventReporterTest.java | 150 ++++++++++++ .../metrics/reporter/KafkaReporterTest.java | 240 ++++++++++++++++++ .../metrics/reporter/MockKafkaPusher.java | 55 +++++ 28 files changed, 1660 insertions(+), 1225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java deleted file mode 100644 index 5f47121..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.metrics; - -import java.util.Properties; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter; -import org.apache.gobblin.metrics.kafka.KafkaAvroReporter; -import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry; -import org.apache.gobblin.metrics.kafka.KafkaEventReporter; -import org.apache.gobblin.metrics.kafka.KafkaReporter; - - -/** - * Kafka reporting formats enumeration. - */ -public enum KafkaReportingFormats { - - AVRO, - JSON; - - /** - * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for this reporting format. - * - * @param properties {@link java.util.Properties} containing information to build reporters. - * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}. - */ - public KafkaReporter.Builder<?> metricReporterBuilder(Properties properties) { - switch (this) { - case AVRO: - KafkaAvroReporter.Builder<?> builder = KafkaAvroReporter.BuilderFactory.newBuilder(); - if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, - ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { - builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); - } - return builder; - case JSON: - return KafkaReporter.BuilderFactory.newBuilder(); - default: - // This should never happen. - throw new IllegalArgumentException("KafkaReportingFormat not recognized."); - } - } - - /** - * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} for this reporting format. - * @param context {@link org.apache.gobblin.metrics.MetricContext} that should be reported. - * @param properties {@link java.util.Properties} containing information to build reporters. - * @return {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder}. - */ - public KafkaEventReporter.Builder<?> eventReporterBuilder(MetricContext context, Properties properties) { - switch (this) { - case AVRO: - KafkaAvroEventReporter.Builder<?> builder = KafkaAvroEventReporter.Factory.forContext(context); - if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, - ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { - builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); - } - return builder; - case JSON: - return KafkaEventReporter.Factory.forContext(context); - default: - // This should never happen. - throw new IllegalArgumentException("KafkaReportingFormat not recognized."); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java deleted file mode 100644 index 5d35c87..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.metrics.kafka; - -import java.io.IOException; - -import org.apache.avro.Schema; - -import com.google.common.base.Optional; - -import org.apache.gobblin.metrics.GobblinTrackingEvent; -import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer; -import org.apache.gobblin.metrics.reporter.util.AvroSerializer; -import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter; -import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; - - -/** - * {@link org.apache.gobblin.metrics.reporter.EventReporter} that emits events to Kafka as serialized Avro records. - */ -public class KafkaAvroEventReporter extends KafkaEventReporter { - - protected KafkaAvroEventReporter(Builder<?> builder) throws IOException { - super(builder); - if(builder.registry.isPresent()) { - Schema schema = - new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc")); - this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, - Optional.of(schema))); - } - } - - @Override - protected AvroSerializer<GobblinTrackingEvent> createSerializer(SchemaVersionWriter schemaVersionWriter) - throws IOException { - return new AvroBinarySerializer<GobblinTrackingEvent>(GobblinTrackingEvent.SCHEMA$, schemaVersionWriter); - } - - /** - * Returns a new {@link KafkaAvroEventReporter.Builder} for {@link KafkaAvroEventReporter}. - * - * @param context the {@link org.apache.gobblin.metrics.MetricContext} to report - * @return KafkaAvroReporter builder - * @deprecated this method is bugged. Use {@link KafkaAvroEventReporter.Factory#forContext} instead. - */ - @Deprecated - public static Builder<? extends Builder<?>> forContext(MetricContext context) { - return new BuilderImpl(context); - } - - private static class BuilderImpl extends Builder<BuilderImpl> { - private BuilderImpl(MetricContext context) { - super(context); - } - - @Override - protected BuilderImpl self() { - return this; - } - } - - public static abstract class Factory { - /** - * Returns a new {@link KafkaAvroEventReporter.Builder} for {@link KafkaAvroEventReporter}. - * - * @param context the {@link org.apache.gobblin.metrics.MetricContext} to report - * @return KafkaAvroReporter builder - */ - public static KafkaAvroEventReporter.BuilderImpl forContext(MetricContext context) { - return new BuilderImpl(context); - } - } - - /** - * Builder for {@link KafkaAvroEventReporter}. - * Defaults to no filter, reporting rates in seconds and times in milliseconds. - */ - public static abstract class Builder<T extends Builder<T>> extends KafkaEventReporter.Builder<T> { - - private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent(); - - private Builder(MetricContext context) { - super(context); - } - - public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) { - this.registry = Optional.of(registry); - return self(); - } - - /** - * Builds and returns {@link KafkaAvroEventReporter}. - * - * @param brokers string of Kafka brokers - * @param topic topic to send metrics to - * @return KafkaAvroReporter - */ - public KafkaAvroEventReporter build(String brokers, String topic) throws IOException { - this.brokers = brokers; - this.topic = topic; - return new KafkaAvroEventReporter(this); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java deleted file mode 100644 index 35d558e..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.metrics.kafka; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.metrics.MetricReport; -import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer; -import org.apache.gobblin.metrics.reporter.util.AvroSerializer; -import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter; -import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; -import org.apache.gobblin.util.ConfigUtils; - -import java.io.IOException; -import java.util.Properties; - -import org.apache.avro.Schema; - -import com.google.common.base.Optional; -import com.typesafe.config.Config; - - -/** - * Kafka reporter for codahale metrics writing metrics in Avro format. - * - * @author ibuenros - */ -public class KafkaAvroReporter extends KafkaReporter { - - protected KafkaAvroReporter(Builder<?> builder, Config config) throws IOException { - super(builder, config); - if (builder.registry.isPresent()) { - Schema schema = - new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("MetricReport.avsc")); - this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, - Optional.of(schema))); - } - } - - @Override - protected AvroSerializer<MetricReport> createSerializer(SchemaVersionWriter schemaVersionWriter) - throws IOException { - return new AvroBinarySerializer<>(MetricReport.SCHEMA$, schemaVersionWriter); - } - - /** - * A static factory class for obtaining new {@link org.apache.gobblin.metrics.kafka.KafkaAvroReporter.Builder}s - * - * @see org.apache.gobblin.metrics.kafka.KafkaAvroReporter.Builder - */ - public static class BuilderFactory { - - public static BuilderImpl newBuilder() { - return new BuilderImpl(); - } - } - - public static class BuilderImpl extends Builder<BuilderImpl> { - - @Override - protected BuilderImpl self() { - return this; - } - } - - /** - * Builder for {@link KafkaAvroReporter}. Defaults to no filter, reporting rates in seconds and times in milliseconds. - */ - public static abstract class Builder<T extends Builder<T>> extends KafkaReporter.Builder<T> { - - private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent(); - - public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) { - this.registry = Optional.of(registry); - return self(); - } - - /** - * Builds and returns {@link KafkaAvroReporter}. - * - * @param brokers string of Kafka brokers - * @param topic topic to send metrics to - * @return KafkaAvroReporter - */ - public KafkaAvroReporter build(String brokers, String topic, Properties props) throws IOException { - this.brokers = brokers; - this.topic = topic; - return new KafkaAvroReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX))); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java deleted file mode 100644 index c23294a..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.metrics.kafka; - -import java.io.IOException; -import java.util.List; -import java.util.Queue; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; - -import org.apache.gobblin.metrics.GobblinTrackingEvent; -import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.metrics.reporter.EventReporter; -import org.apache.gobblin.metrics.reporter.util.AvroJsonSerializer; -import org.apache.gobblin.metrics.reporter.util.AvroSerializer; -import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter; -import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; - - -/** - * Reports {@link org.apache.gobblin.metrics.GobblinTrackingEvent} to a Kafka topic serialized as JSON. - */ -public class KafkaEventReporter extends EventReporter { - - protected final AvroSerializer<GobblinTrackingEvent> serializer; - private final KafkaPusher kafkaPusher; - - public KafkaEventReporter(Builder<?> builder) throws IOException { - super(builder); - - this.serializer = this.closer.register( - createSerializer(new FixedSchemaVersionWriter())); - - if(builder.kafkaPusher.isPresent()) { - this.kafkaPusher = builder.kafkaPusher.get(); - } else { - this.kafkaPusher = this.closer.register(new KafkaPusher(builder.brokers, builder.topic)); - } - - } - - @Override - public void reportEventQueue(Queue<GobblinTrackingEvent> queue) { - GobblinTrackingEvent nextEvent; - List<byte[]> events = Lists.newArrayList(); - - while(null != (nextEvent = queue.poll())) { - events.add(this.serializer.serializeRecord(nextEvent)); - } - - if (!events.isEmpty()) { - this.kafkaPusher.pushMessages(events); - } - - } - - protected AvroSerializer<GobblinTrackingEvent> createSerializer(SchemaVersionWriter schemaVersionWriter) throws IOException { - return new AvroJsonSerializer<GobblinTrackingEvent>(GobblinTrackingEvent.SCHEMA$, schemaVersionWriter); - } - - /** - * Returns a new {@link KafkaEventReporter.Builder} for {@link KafkaEventReporter}. - * Will automatically add all Context tags to the reporter. - * - * @param context the {@link org.apache.gobblin.metrics.MetricContext} to report - * @return KafkaReporter builder - * @deprecated this method is bugged. Use {@link KafkaEventReporter.Factory#forContext} instead. - */ - @Deprecated - public static Builder<? extends Builder> forContext(MetricContext context) { - return new BuilderImpl(context); - } - - public static class BuilderImpl extends Builder<BuilderImpl> { - private BuilderImpl(MetricContext context) { - super(context); - } - - @Override - protected BuilderImpl self() { - return this; - } - } - - public static class Factory { - /** - * Returns a new {@link KafkaEventReporter.Builder} for {@link KafkaEventReporter}. - * Will automatically add all Context tags to the reporter. - * - * @param context the {@link org.apache.gobblin.metrics.MetricContext} to report - * @return KafkaReporter builder - */ - public static BuilderImpl forContext(MetricContext context) { - return new BuilderImpl(context); - } - } - - /** - * Builder for {@link KafkaEventReporter}. - * Defaults to no filter, reporting rates in seconds and times in milliseconds. - */ - public static abstract class Builder<T extends EventReporter.Builder<T>> - extends EventReporter.Builder<T> { - protected String brokers; - protected String topic; - protected Optional<KafkaPusher> kafkaPusher; - - protected Builder(MetricContext context) { - super(context); - this.kafkaPusher = Optional.absent(); - } - - /** - * Set {@link org.apache.gobblin.metrics.kafka.KafkaPusher} to use. - */ - public T withKafkaPusher(KafkaPusher pusher) { - this.kafkaPusher = Optional.of(pusher); - return self(); - } - - /** - * Builds and returns {@link KafkaEventReporter}. - * - * @param brokers string of Kafka brokers - * @param topic topic to send metrics to - * @return KafkaReporter - */ - public KafkaEventReporter build(String brokers, String topic) throws IOException { - this.brokers = brokers; - this.topic = topic; - return new KafkaEventReporter(this); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java new file mode 100644 index 0000000..ff75a92 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import com.google.common.base.Optional; +import com.google.common.io.Closer; +import com.typesafe.config.Config; + +import org.apache.gobblin.util.ConfigUtils; + + +/** + * Establishes a connection to a Kafka cluster and push byte messages to a specified topic. + */ +public class KafkaProducerPusher implements Pusher { + + private final String topic; + private final KafkaProducer<String, byte[]> producer; + private final Closer closer; + + public KafkaProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) { + this.closer = Closer.create(); + + this.topic = topic; + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + props.put(ProducerConfig.ACKS_CONFIG, "1"); + + // add the kafka scoped config. if any of the above are specified then they are overridden + if (kafkaConfig.isPresent()) { + props.putAll(ConfigUtils.configToProperties(kafkaConfig.get())); + } + + this.producer = createProducer(props); + } + + public KafkaProducerPusher(String brokers, String topic) { + this(brokers, topic, Optional.absent()); + } + + /** + * Push all byte array messages to the Kafka topic. + * @param messages List of byte array messages to push to Kakfa. + */ + public void pushMessages(List<byte[]> messages) { + for (byte[] message: messages) { + this.producer.send(new ProducerRecord<String, byte[]>(topic, message)); + } + } + + @Override + public void close() + throws IOException { + this.closer.close(); + } + + /** + * Create the Kafka producer. + */ + protected KafkaProducer<String, byte[]> createProducer(Properties props) { + return this.closer.register(new KafkaProducer<String, byte[]>(props)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java index 29162ac..1c977ff 100644 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java +++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java @@ -17,7 +17,6 @@ package org.apache.gobblin.metrics.kafka; -import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Properties; @@ -34,7 +33,7 @@ import kafka.producer.ProducerConfig; /** * Establishes a connection to a Kafka cluster and pushed byte messages to a specified topic. */ -public class KafkaPusher implements Closeable { +public class KafkaPusher implements Pusher { private final String topic; private final ProducerCloseable<String, byte[]> producer; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java deleted file mode 100644 index 2aa0e97..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.metrics.kafka; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.metrics.MetricReport; -import org.apache.gobblin.metrics.reporter.MetricReportReporter; -import org.apache.gobblin.metrics.reporter.util.AvroJsonSerializer; -import org.apache.gobblin.metrics.reporter.util.AvroSerializer; -import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter; -import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; -import org.apache.gobblin.util.ClassAliasResolver; -import org.apache.gobblin.util.ConfigUtils; - -import java.io.IOException; -import java.util.Properties; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.typesafe.config.Config; - -import lombok.extern.slf4j.Slf4j; - - -/** - * Kafka reporter for metrics. - * - * @author ibuenros - */ -@Slf4j -public class KafkaReporter extends MetricReportReporter { - - public static final String SCHEMA_VERSION_WRITER_TYPE = "metrics.kafka.schemaVersionWriterType"; - - protected final AvroSerializer<MetricReport> serializer; - protected final KafkaPusher kafkaPusher; - - - protected KafkaReporter(Builder<?> builder, Config config) throws IOException { - super(builder, config); - - SchemaVersionWriter versionWriter; - if (config.hasPath(SCHEMA_VERSION_WRITER_TYPE)) { - try { - ClassAliasResolver<SchemaVersionWriter> resolver = new ClassAliasResolver<>(SchemaVersionWriter.class); - Class<? extends SchemaVersionWriter> klazz = resolver.resolveClass(config.getString(SCHEMA_VERSION_WRITER_TYPE)); - versionWriter = klazz.newInstance(); - } catch (ReflectiveOperationException roe) { - throw new IOException("Could not instantiate version writer.", roe); - } - } else { - versionWriter = new FixedSchemaVersionWriter(); - } - - log.info("Schema version writer: " + versionWriter.getClass().getName()); - this.serializer = this.closer.register(createSerializer(versionWriter)); - - if (builder.kafkaPusher.isPresent()) { - this.kafkaPusher = builder.kafkaPusher.get(); - } else { - this.kafkaPusher = this.closer.register(new KafkaPusher(builder.brokers, builder.topic)); - } - } - - protected AvroSerializer<MetricReport> createSerializer(SchemaVersionWriter schemaVersionWriter) throws IOException { - return new AvroJsonSerializer<>(MetricReport.SCHEMA$, schemaVersionWriter); - } - - /** - * A static factory class for obtaining new {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}s - * - * @see org.apache.gobblin.metrics.kafka.KafkaReporter.Builder - */ - public static class BuilderFactory { - - public static BuilderImpl newBuilder() { - return new BuilderImpl(); - } - } - - public static class BuilderImpl extends Builder<BuilderImpl> { - - @Override - protected BuilderImpl self() { - return this; - } - } - - /** - * Builder for {@link KafkaReporter}. Defaults to no filter, reporting rates in seconds and times in milliseconds. - */ - public static abstract class Builder<T extends MetricReportReporter.Builder<T>> - extends MetricReportReporter.Builder<T> { - - protected String brokers; - protected String topic; - protected Optional<KafkaPusher> kafkaPusher; - - protected Builder() { - super(); - this.name = "KafkaReporter"; - this.kafkaPusher = Optional.absent(); - } - - /** - * Set {@link org.apache.gobblin.metrics.kafka.KafkaPusher} to use. - */ - public T withKafkaPusher(KafkaPusher pusher) { - this.kafkaPusher = Optional.of(pusher); - return self(); - } - - /** - * Builds and returns {@link KafkaReporter}. - * - * @param brokers string of Kafka brokers - * @param topic topic to send metrics to - * @return KafkaReporter - */ - public KafkaReporter build(String brokers, String topic, Properties props) throws IOException { - this.brokers = brokers; - this.topic = topic; - - return new KafkaReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX))); - } - } - - @Override - protected void emitReport(MetricReport report) { - this.kafkaPusher.pushMessages(Lists.newArrayList(this.serializer.serializeRecord(report))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java deleted file mode 100644 index 328a47b..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.metrics.kafka; - -import java.io.IOException; -import java.util.Properties; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.ScheduledReporter; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; - -import lombok.extern.slf4j.Slf4j; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.metrics.CustomCodahaleReporterFactory; -import org.apache.gobblin.metrics.KafkaReportingFormats; -import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.metrics.RootMetricContext; - - -@Slf4j -public class KafkaReporterFactory implements CustomCodahaleReporterFactory { - @Override - public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties) - throws IOException { - if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY, - ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) { - return null; - } - log.info("Reporting metrics to Kafka"); - - Optional<String> defaultTopic = Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC)); - Optional<String> metricsTopic = Optional.fromNullable( - properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS)); - Optional<String> eventsTopic = Optional.fromNullable( - properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS)); - - boolean metricsEnabled = metricsTopic.or(defaultTopic).isPresent(); - if (metricsEnabled) log.info("Reporting metrics to Kafka"); - boolean eventsEnabled = eventsTopic.or(defaultTopic).isPresent(); - if (eventsEnabled) log.info("Reporting events to Kafka"); - - try { - Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS), - "Kafka metrics brokers missing."); - Preconditions.checkArgument(metricsTopic.or(eventsTopic).or(defaultTopic).isPresent(), "Kafka topic missing."); - } catch (IllegalArgumentException exception) { - log.error("Not reporting metrics to Kafka due to missing Kafka configuration(s).", exception); - return null; - } - - String brokers = properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS); - - String reportingFormat = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_FORMAT, - ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_FORMAT); - - KafkaReportingFormats formatEnum; - try { - formatEnum = KafkaReportingFormats.valueOf(reportingFormat.toUpperCase()); - } catch (IllegalArgumentException exception) { - log.warn("Kafka metrics reporting format " + reportingFormat + - " not recognized. Will report in json format.", exception); - formatEnum = KafkaReportingFormats.JSON; - } - - if (metricsEnabled) { - try { - formatEnum.metricReporterBuilder(properties) - .build(brokers, metricsTopic.or(defaultTopic).get(), properties); - } catch (IOException exception) { - log.error("Failed to create Kafka metrics reporter. Will not report metrics to Kafka.", exception); - } - } - - if (eventsEnabled) { - try { - KafkaEventReporter.Builder<?> builder = formatEnum.eventReporterBuilder(RootMetricContext.get(), - properties); - return builder.build(brokers, eventsTopic.or(defaultTopic).get()); - } catch (IOException exception) { - log.error("Failed to create Kafka events reporter. Will not report events to Kafka.", exception); - } - } - - log.info("Will start reporting metrics to Kafka"); - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java deleted file mode 100644 index 066dce4..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.metrics.reporter; - -import java.io.IOException; -import java.util.Iterator; - -import org.testng.Assert; -import org.testng.annotations.Test; - -import org.apache.gobblin.metrics.GobblinTrackingEvent; -import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.metrics.reporter.util.EventUtils; -import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter; -import org.apache.gobblin.metrics.kafka.KafkaEventReporter; -import org.apache.gobblin.metrics.kafka.KafkaPusher; - - -@Test(groups = {"gobblin.metrics"}) -public class KafkaAvroEventReporterTest extends KafkaEventReporterTest { - - @Override - public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context, - KafkaPusher pusher) { - return KafkaAvroEventReporter.forContext(context).withKafkaPusher(pusher); - } - - @Override - @SuppressWarnings("unchecked") - protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it) - throws IOException { - Assert.assertTrue(it.hasNext()); - return EventUtils.deserializeReportFromAvroSerialization(new GobblinTrackingEvent(), it.next()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java deleted file mode 100644 index bbf2646..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.metrics.reporter; - -import java.io.IOException; -import java.util.Iterator; - -import org.testng.Assert; -import org.testng.annotations.Test; - -import org.apache.gobblin.metrics.MetricReport; -import org.apache.gobblin.metrics.reporter.util.MetricReportUtils; -import org.apache.gobblin.metrics.kafka.KafkaAvroReporter; -import org.apache.gobblin.metrics.kafka.KafkaPusher; -import org.apache.gobblin.metrics.kafka.KafkaReporter; - - -/** - * Test for KafkaAvroReporter - * Extends KafkaReporterTest and just redefines the builder and the metrics deserializer - * - * @author ibuenros - */ -@Test(groups = {"gobblin.metrics"}) -public class KafkaAvroReporterTest extends KafkaReporterTest { - - public KafkaAvroReporterTest(String topic) - throws IOException, InterruptedException { - super(); - } - - public KafkaAvroReporterTest() throws IOException, InterruptedException { - this("KafkaAvroReporterTest"); - } - - @Override - public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(KafkaPusher pusher) { - return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher); - } - - @Override - public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(KafkaPusher pusher) { - return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher); - } - - @Override - @SuppressWarnings("unchecked") - protected MetricReport nextReport(Iterator<byte[]> it) - throws IOException { - Assert.assertTrue(it.hasNext()); - return MetricReportUtils.deserializeReportFromAvroSerialization(new MetricReport(), it.next()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java deleted file mode 100644 index 177f8d3..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.metrics.reporter; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; - -import org.testng.Assert; -import org.testng.annotations.Test; - -import com.google.common.collect.Maps; - -import org.apache.gobblin.metrics.GobblinTrackingEvent; -import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.metrics.Tag; -import org.apache.gobblin.metrics.reporter.util.EventUtils; -import org.apache.gobblin.metrics.kafka.KafkaEventReporter; -import org.apache.gobblin.metrics.kafka.KafkaPusher; - - -@Test(groups = {"gobblin.metrics"}) -public class KafkaEventReporterTest { - - /** - * Get builder for KafkaReporter (override if testing an extension of KafkaReporter) - * @param context metricregistry - * @return KafkaReporter builder - */ - public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext context, - KafkaPusher pusher) { - return KafkaEventReporter.Factory.forContext(context).withKafkaPusher(pusher); - } - - - @Test - public void testKafkaEventReporter() throws IOException { - MetricContext context = MetricContext.builder("context").build(); - - MockKafkaPusher pusher = new MockKafkaPusher(); - KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic"); - - String namespace = "gobblin.metrics.test"; - String eventName = "testEvent"; - - GobblinTrackingEvent event = new GobblinTrackingEvent(); - event.setName(eventName); - event.setNamespace(namespace); - Map<String, String> metadata = Maps.newHashMap(); - metadata.put("m1", "v1"); - metadata.put("m2", null); - event.setMetadata(metadata); - context.submitEvent(event); - - try { - Thread.sleep(100); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - kafkaReporter.report(); - - try { - Thread.sleep(100); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - GobblinTrackingEvent retrievedEvent = nextEvent(pusher.messageIterator()); - Assert.assertEquals(retrievedEvent.getNamespace(), namespace); - Assert.assertEquals(retrievedEvent.getName(), eventName); - Assert.assertEquals(retrievedEvent.getMetadata().size(), 4); - - } - - @Test - public void testTagInjection() throws IOException { - - String tag1 = "tag1"; - String value1 = "value1"; - String metadataValue1 = "metadata1"; - String tag2 = "tag2"; - String value2 = "value2"; - - MetricContext context = MetricContext.builder("context").addTag(new Tag<String>(tag1, value1)). - addTag(new Tag<String>(tag2, value2)).build(); - - MockKafkaPusher pusher = new MockKafkaPusher(); - KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000", "topic"); - - String namespace = "gobblin.metrics.test"; - String eventName = "testEvent"; - - GobblinTrackingEvent event = new GobblinTrackingEvent(); - event.setName(eventName); - event.setNamespace(namespace); - Map<String, String> metadata = Maps.newHashMap(); - metadata.put(tag1, metadataValue1); - event.setMetadata(metadata); - context.submitEvent(event); - - try { - Thread.sleep(100); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - kafkaReporter.report(); - - try { - Thread.sleep(100); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - GobblinTrackingEvent retrievedEvent = nextEvent(pusher.messageIterator()); - Assert.assertEquals(retrievedEvent.getNamespace(), namespace); - Assert.assertEquals(retrievedEvent.getName(), eventName); - Assert.assertEquals(retrievedEvent.getMetadata().size(), 4); - Assert.assertEquals(retrievedEvent.getMetadata().get(tag1), metadataValue1); - Assert.assertEquals(retrievedEvent.getMetadata().get(tag2), value2); - } - - /** - * Extract the next metric from the Kafka iterator - * Assumes existence of the metric has already been checked. - * @param it Kafka ConsumerIterator - * @return next metric in the stream - * @throws java.io.IOException - */ - protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it) throws IOException { - Assert.assertTrue(it.hasNext()); - return EventUtils.deserializeReportFromJson(new GobblinTrackingEvent(), it.next()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java deleted file mode 100644 index c431cb0..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.metrics.reporter; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import org.testng.Assert; -import org.testng.annotations.Test; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; - -import com.google.common.collect.Lists; - -import org.apache.gobblin.metrics.Measurements; -import org.apache.gobblin.metrics.Metric; -import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.metrics.MetricReport; -import org.apache.gobblin.metrics.Tag; -import org.apache.gobblin.metrics.kafka.KafkaPusher; -import org.apache.gobblin.metrics.kafka.KafkaReporter; -import org.apache.gobblin.metrics.reporter.util.MetricReportUtils; - - -@Test(groups = { "gobblin.metrics" }) -public class KafkaReporterTest { - - public KafkaReporterTest() throws IOException, InterruptedException {} - - /** - * Get builder for KafkaReporter (override if testing an extension of KafkaReporter) - * @return KafkaReporter builder - */ - public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(KafkaPusher pusher) { - return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher); - } - - public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(KafkaPusher pusher) { - return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher); - } - - @Test - public void testKafkaReporter() throws IOException { - MetricContext metricContext = - MetricContext.builder(this.getClass().getCanonicalName() + ".testKafkaReporter").build(); - Counter counter = metricContext.counter("com.linkedin.example.counter"); - Meter meter = metricContext.meter("com.linkedin.example.meter"); - Histogram histogram = metricContext.histogram("com.linkedin.example.histogram"); - - MockKafkaPusher pusher = new MockKafkaPusher(); - KafkaReporter kafkaReporter = getBuilder(pusher).build("localhost:0000", "topic", new Properties()); - - counter.inc(); - meter.mark(2); - histogram.update(1); - histogram.update(1); - histogram.update(2); - - kafkaReporter.report(metricContext); - - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - Map<String, Double> expected = new HashMap<>(); - expected.put("com.linkedin.example.counter." + Measurements.COUNT, 1.0); - expected.put("com.linkedin.example.meter." + Measurements.COUNT, 2.0); - expected.put("com.linkedin.example.histogram." + Measurements.COUNT, 3.0); - - MetricReport nextReport = nextReport(pusher.messageIterator()); - - expectMetricsWithValues(nextReport, expected); - - kafkaReporter.report(metricContext); - - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - Set<String> expectedSet = new HashSet<>(); - expectedSet.add("com.linkedin.example.counter." + Measurements.COUNT); - expectedSet.add("com.linkedin.example.meter." + Measurements.COUNT); - expectedSet.add("com.linkedin.example.meter." + Measurements.MEAN_RATE); - expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_1MIN); - expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_5MIN); - expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_15MIN); - expectedSet.add("com.linkedin.example.histogram." + Measurements.MEAN); - expectedSet.add("com.linkedin.example.histogram." + Measurements.MIN); - expectedSet.add("com.linkedin.example.histogram." + Measurements.MAX); - expectedSet.add("com.linkedin.example.histogram." + Measurements.MEDIAN); - expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_75TH); - expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_95TH); - expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_99TH); - expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_999TH); - expectedSet.add("com.linkedin.example.histogram." + Measurements.COUNT); - - nextReport = nextReport(pusher.messageIterator()); - expectMetrics(nextReport, expectedSet, true); - - kafkaReporter.close(); - - } - - @Test - public void kafkaReporterTagsTest() throws IOException { - MetricContext metricContext = - MetricContext.builder(this.getClass().getCanonicalName() + ".kafkaReporterTagsTest").build(); - Counter counter = metricContext.counter("com.linkedin.example.counter"); - - Tag<?> tag1 = new Tag<>("tag1", "value1"); - Tag<?> tag2 = new Tag<>("tag2", 2); - - MockKafkaPusher pusher = new MockKafkaPusher(); - KafkaReporter kafkaReporter = - getBuilder(pusher).withTags(Lists.newArrayList(tag1, tag2)).build("localhost:0000", "topic", new Properties()); - - counter.inc(); - - kafkaReporter.report(metricContext); - - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - MetricReport metricReport = nextReport(pusher.messageIterator()); - - Assert.assertEquals(4, metricReport.getTags().size()); - Assert.assertTrue(metricReport.getTags().containsKey(tag1.getKey())); - Assert.assertEquals(metricReport.getTags().get(tag1.getKey()), tag1.getValue().toString()); - Assert.assertTrue(metricReport.getTags().containsKey(tag2.getKey())); - Assert.assertEquals(metricReport.getTags().get(tag2.getKey()), tag2.getValue().toString()); - } - - @Test - public void kafkaReporterContextTest() throws IOException { - Tag<?> tag1 = new Tag<>("tag1", "value1"); - MetricContext context = MetricContext.builder("context").addTag(tag1).build(); - Counter counter = context.counter("com.linkedin.example.counter"); - - MockKafkaPusher pusher = new MockKafkaPusher(); - KafkaReporter kafkaReporter = getBuilderFromContext(pusher).build("localhost:0000", "topic", new Properties()); - - counter.inc(); - - kafkaReporter.report(context); - - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - MetricReport metricReport = nextReport(pusher.messageIterator()); - - Assert.assertEquals(3, metricReport.getTags().size()); - Assert.assertTrue(metricReport.getTags().containsKey(tag1.getKey())); - Assert.assertEquals(metricReport.getTags().get(tag1.getKey()), tag1.getValue().toString()); - - } - - /** - * Expect a list of metrics with specific values. - * Fail if not all metrics are received, or some metric has the wrong value. - * @param report MetricReport. - * @param expected map of expected metric names and their values - * @throws IOException - */ - private void expectMetricsWithValues(MetricReport report, Map<String, Double> expected) throws IOException { - List<Metric> metricIterator = report.getMetrics(); - - for (Metric metric : metricIterator) { - if (expected.containsKey(metric.getName())) { - Assert.assertEquals(expected.get(metric.getName()), metric.getValue()); - expected.remove(metric.getName()); - } - } - - Assert.assertTrue(expected.isEmpty()); - - } - - /** - * Expect a set of metric names. Will fail if not all of these metrics are received. - * @param report MetricReport - * @param expected set of expected metric names - * @param strict if set to true, will fail if receiving any metric that is not expected - * @throws IOException - */ - private void expectMetrics(MetricReport report, Set<String> expected, boolean strict) throws IOException { - List<Metric> metricIterator = report.getMetrics(); - for (Metric metric : metricIterator) { - //System.out.println(String.format("expectedSet.add(\"%s\")", metric.name)); - if (expected.contains(metric.getName())) { - expected.remove(metric.getName()); - } else if (strict && !metric.getName().contains(MetricContext.GOBBLIN_METRICS_NOTIFICATIONS_TIMER_NAME)) { - Assert.assertTrue(false, "Metric present in report not expected: " + metric.toString()); - } - } - Assert.assertTrue(expected.isEmpty()); - } - - /** - * Extract the next metric from the Kafka iterator - * Assumes existence of the metric has already been checked. - * @param it Kafka ConsumerIterator - * @return next metric in the stream - * @throws IOException - */ - protected MetricReport nextReport(Iterator<byte[]> it) throws IOException { - Assert.assertTrue(it.hasNext()); - return MetricReportUtils.deserializeReportFromJson(new MetricReport(), it.next()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java new file mode 100644 index 0000000..3d2de9b --- /dev/null +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import com.google.common.base.Optional; +import com.google.common.io.Closer; +import com.typesafe.config.Config; + +import org.apache.gobblin.util.ConfigUtils; + + +/** + * Establish a connection to a Kafka cluster and push byte messages to a specified topic. + */ +public class KafkaProducerPusher implements Pusher { + + private final String topic; + private final KafkaProducer<String, byte[]> producer; + private final Closer closer; + + public KafkaProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) { + this.closer = Closer.create(); + + this.topic = topic; + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + props.put(ProducerConfig.ACKS_CONFIG, "1"); + + // add the kafka scoped config. if any of the above are specified then they are overridden + if (kafkaConfig.isPresent()) { + props.putAll(ConfigUtils.configToProperties(kafkaConfig.get())); + } + + this.producer = createProducer(props); + } + + public KafkaProducerPusher(String brokers, String topic) { + this(brokers, topic, Optional.absent()); + } + + /** + * Push all byte array messages to the Kafka topic. + * @param messages List of byte array messages to push to Kakfa. + */ + public void pushMessages(List<byte[]> messages) { + for (byte[] message: messages) { + this.producer.send(new ProducerRecord<String, byte[]>(topic, message)); + } + } + + @Override + public void close() + throws IOException { + this.closer.close(); + } + + /** + * Create the Kafka producer. + */ + protected KafkaProducer<String, byte[]> createProducer(Properties props) { + return this.closer.register(new KafkaProducer<String, byte[]>(props)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java new file mode 100644 index 0000000..723f8b7 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.reporter; + +import java.io.IOException; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.kafka.KafkaTestBase; +import org.apache.gobblin.metrics.kafka.KafkaProducerPusher; +import org.apache.gobblin.metrics.kafka.Pusher; + +import kafka.consumer.ConsumerIterator; + + +/** + * Test {@link org.apache.gobblin.metrics.kafka.KafkaProducerPusher}. + */ +public class KafkaProducerPusherTest { + public static final String TOPIC = KafkaProducerPusherTest.class.getSimpleName(); + + private KafkaTestBase kafkaTestHelper; + + @BeforeClass + public void setup() throws Exception { + kafkaTestHelper = new KafkaTestBase(); + kafkaTestHelper.startServers(); + + kafkaTestHelper.provisionTopic(TOPIC); + } + + @Test + public void test() throws IOException { + // Test that the scoped config overrides the generic config + Pusher pusher = new KafkaProducerPusher("localhost:dummy", TOPIC, Optional.of(ConfigFactory.parseMap(ImmutableMap.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort())))); + + String msg1 = "msg1"; + String msg2 = "msg2"; + + pusher.pushMessages(Lists.newArrayList(msg1.getBytes(), msg2.getBytes())); + + try { + Thread.sleep(1000); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC); + + assert(iterator.hasNext()); + Assert.assertEquals(new String(iterator.next().message()), msg1); + assert(iterator.hasNext()); + Assert.assertEquals(new String(iterator.next().message()), msg2); + + pusher.close(); + } + + @AfterClass + public void after() { + try { + this.kafkaTestHelper.close(); + } catch(Exception e) { + System.err.println("Failed to close Kafka server."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java new file mode 100644 index 0000000..a0af52a --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics; + +import java.util.Properties; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter; +import org.apache.gobblin.metrics.kafka.KafkaAvroReporter; +import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry; +import org.apache.gobblin.metrics.kafka.KafkaEventReporter; +import org.apache.gobblin.metrics.kafka.KafkaReporter; + + +/** + * Kafka reporting formats enumeration. + */ +public enum KafkaReportingFormats { + + AVRO, + JSON; + + /** + * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for this reporting format. + * + * @param properties {@link Properties} containing information to build reporters. + * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}. + */ + public KafkaReporter.Builder<?> metricReporterBuilder(Properties properties) { + switch (this) { + case AVRO: + KafkaAvroReporter.Builder<?> builder = KafkaAvroReporter.BuilderFactory.newBuilder(); + if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { + builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); + } + return builder; + case JSON: + return KafkaReporter.BuilderFactory.newBuilder(); + default: + // This should never happen. + throw new IllegalArgumentException("KafkaReportingFormat not recognized."); + } + } + + /** + * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} for this reporting format. + * @param context {@link MetricContext} that should be reported. + * @param properties {@link Properties} containing information to build reporters. + * @return {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder}. + */ + public KafkaEventReporter.Builder<?> eventReporterBuilder(MetricContext context, Properties properties) { + switch (this) { + case AVRO: + KafkaAvroEventReporter.Builder<?> builder = KafkaAvroEventReporter.Factory.forContext(context); + if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { + builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); + } + return builder; + case JSON: + return KafkaEventReporter.Factory.forContext(context); + default: + // This should never happen. + throw new IllegalArgumentException("KafkaReportingFormat not recognized."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java new file mode 100644 index 0000000..ecb5d7d --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroEventReporter.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.io.IOException; + +import org.apache.avro.Schema; + +import com.google.common.base.Optional; + +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer; +import org.apache.gobblin.metrics.reporter.util.AvroSerializer; +import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter; +import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; + + +/** + * {@link org.apache.gobblin.metrics.reporter.EventReporter} that emits events to Kafka as serialized Avro records. + */ +public class KafkaAvroEventReporter extends KafkaEventReporter { + + protected KafkaAvroEventReporter(Builder<?> builder) throws IOException { + super(builder); + if(builder.registry.isPresent()) { + Schema schema = + new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc")); + this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, + Optional.of(schema))); + } + } + + @Override + protected AvroSerializer<GobblinTrackingEvent> createSerializer(SchemaVersionWriter schemaVersionWriter) + throws IOException { + return new AvroBinarySerializer<GobblinTrackingEvent>(GobblinTrackingEvent.SCHEMA$, schemaVersionWriter); + } + + /** + * Returns a new {@link Builder} for {@link KafkaAvroEventReporter}. + * + * @param context the {@link MetricContext} to report + * @return KafkaAvroReporter builder + * @deprecated this method is bugged. Use {@link Factory#forContext} instead. + */ + @Deprecated + public static Builder<? extends Builder<?>> forContext(MetricContext context) { + return new BuilderImpl(context); + } + + private static class BuilderImpl extends Builder<BuilderImpl> { + private BuilderImpl(MetricContext context) { + super(context); + } + + @Override + protected BuilderImpl self() { + return this; + } + } + + public static abstract class Factory { + /** + * Returns a new {@link Builder} for {@link KafkaAvroEventReporter}. + * + * @param context the {@link MetricContext} to report + * @return KafkaAvroReporter builder + */ + public static BuilderImpl forContext(MetricContext context) { + return new BuilderImpl(context); + } + } + + /** + * Builder for {@link KafkaAvroEventReporter}. + * Defaults to no filter, reporting rates in seconds and times in milliseconds. + */ + public static abstract class Builder<T extends Builder<T>> extends KafkaEventReporter.Builder<T> { + + private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent(); + + private Builder(MetricContext context) { + super(context); + } + + public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) { + this.registry = Optional.of(registry); + return self(); + } + + /** + * Builds and returns {@link KafkaAvroEventReporter}. + * + * @param brokers string of Kafka brokers + * @param topic topic to send metrics to + * @return KafkaAvroReporter + */ + public KafkaAvroEventReporter build(String brokers, String topic) throws IOException { + this.brokers = brokers; + this.topic = topic; + return new KafkaAvroEventReporter(this); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java new file mode 100644 index 0000000..4b6399b --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.avro.Schema; + +import com.google.common.base.Optional; +import com.typesafe.config.Config; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.MetricReport; +import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer; +import org.apache.gobblin.metrics.reporter.util.AvroSerializer; +import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter; +import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * Kafka reporter for codahale metrics writing metrics in Avro format. + * + * @author ibuenros + */ +public class KafkaAvroReporter extends KafkaReporter { + + protected KafkaAvroReporter(Builder<?> builder, Config config) throws IOException { + super(builder, config); + if (builder.registry.isPresent()) { + Schema schema = + new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("MetricReport.avsc")); + this.serializer.setSchemaVersionWriter(new SchemaRegistryVersionWriter(builder.registry.get(), builder.topic, + Optional.of(schema))); + } + } + + @Override + protected AvroSerializer<MetricReport> createSerializer(SchemaVersionWriter schemaVersionWriter) + throws IOException { + return new AvroBinarySerializer<>(MetricReport.SCHEMA$, schemaVersionWriter); + } + + /** + * A static factory class for obtaining new {@link Builder}s + * + * @see Builder + */ + public static class BuilderFactory { + + public static BuilderImpl newBuilder() { + return new BuilderImpl(); + } + } + + public static class BuilderImpl extends Builder<BuilderImpl> { + + @Override + protected BuilderImpl self() { + return this; + } + } + + /** + * Builder for {@link KafkaAvroReporter}. Defaults to no filter, reporting rates in seconds and times in milliseconds. + */ + public static abstract class Builder<T extends Builder<T>> extends KafkaReporter.Builder<T> { + + private Optional<KafkaAvroSchemaRegistry> registry = Optional.absent(); + + public T withSchemaRegistry(KafkaAvroSchemaRegistry registry) { + this.registry = Optional.of(registry); + return self(); + } + + /** + * Builds and returns {@link KafkaAvroReporter}. + * + * @param brokers string of Kafka brokers + * @param topic topic to send metrics to + * @return KafkaAvroReporter + */ + public KafkaAvroReporter build(String brokers, String topic, Properties props) throws IOException { + this.brokers = brokers; + this.topic = topic; + return new KafkaAvroReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX))); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java index 4c155fb..6162636 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java @@ -31,7 +31,6 @@ import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; -import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +39,7 @@ import com.google.common.base.Preconditions; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.kafka.schemareg.HttpClientFactory; +import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil; import org.apache.gobblin.util.AvroUtils; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java new file mode 100644 index 0000000..b15e96e --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.io.IOException; +import java.util.List; +import java.util.Queue; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.reporter.EventReporter; +import org.apache.gobblin.metrics.reporter.util.AvroJsonSerializer; +import org.apache.gobblin.metrics.reporter.util.AvroSerializer; +import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter; +import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; + + +/** + * Reports {@link GobblinTrackingEvent} to a Kafka topic serialized as JSON. + */ +public class KafkaEventReporter extends EventReporter { + + protected final AvroSerializer<GobblinTrackingEvent> serializer; + private final Pusher kafkaPusher; + + public KafkaEventReporter(Builder<?> builder) throws IOException { + super(builder); + + this.serializer = this.closer.register( + createSerializer(new FixedSchemaVersionWriter())); + + if(builder.kafkaPusher.isPresent()) { + this.kafkaPusher = builder.kafkaPusher.get(); + } else { + String pusherClassName = builder.pusherClassName.or(PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME); + this.kafkaPusher = PusherUtils.getPusher(pusherClassName, builder.brokers, builder.topic, builder.config); + } + } + + @Override + public void reportEventQueue(Queue<GobblinTrackingEvent> queue) { + GobblinTrackingEvent nextEvent; + List<byte[]> events = Lists.newArrayList(); + + while(null != (nextEvent = queue.poll())) { + events.add(this.serializer.serializeRecord(nextEvent)); + } + + if (!events.isEmpty()) { + this.kafkaPusher.pushMessages(events); + } + + } + + protected AvroSerializer<GobblinTrackingEvent> createSerializer(SchemaVersionWriter schemaVersionWriter) throws IOException { + return new AvroJsonSerializer<GobblinTrackingEvent>(GobblinTrackingEvent.SCHEMA$, schemaVersionWriter); + } + + /** + * Returns a new {@link Builder} for {@link KafkaEventReporter}. + * Will automatically add all Context tags to the reporter. + * + * @param context the {@link MetricContext} to report + * @return KafkaReporter builder + * @deprecated this method is bugged. Use {@link Factory#forContext} instead. + */ + @Deprecated + public static Builder<? extends Builder> forContext(MetricContext context) { + return new BuilderImpl(context); + } + + public static class BuilderImpl extends Builder<BuilderImpl> { + private BuilderImpl(MetricContext context) { + super(context); + } + + @Override + protected BuilderImpl self() { + return this; + } + } + + public static class Factory { + /** + * Returns a new {@link Builder} for {@link KafkaEventReporter}. + * Will automatically add all Context tags to the reporter. + * + * @param context the {@link MetricContext} to report + * @return KafkaReporter builder + */ + public static BuilderImpl forContext(MetricContext context) { + return new BuilderImpl(context); + } + } + + /** + * Builder for {@link KafkaEventReporter}. + * Defaults to no filter, reporting rates in seconds and times in milliseconds. + */ + public static abstract class Builder<T extends EventReporter.Builder<T>> + extends EventReporter.Builder<T> { + protected String brokers; + protected String topic; + protected Optional<Pusher> kafkaPusher; + protected Optional<Config> config = Optional.absent(); + protected Optional<String> pusherClassName = Optional.absent(); + + protected Builder(MetricContext context) { + super(context); + this.kafkaPusher = Optional.absent(); + } + + /** + * Set {@link Pusher} to use. + */ + public T withKafkaPusher(Pusher pusher) { + this.kafkaPusher = Optional.of(pusher); + return self(); + } + + /** + * Set additional configuration. + */ + public T withConfig(Config config) { + this.config = Optional.of(config); + return self(); + } + + /** + * Set a {@link Pusher} class name + */ + public T withPusherClassName(String pusherClassName) { + this.pusherClassName = Optional.of(pusherClassName); + return self(); + } + + /** + * Builds and returns {@link KafkaEventReporter}. + * + * @param brokers string of Kafka brokers + * @param topic topic to send metrics to + * @return KafkaReporter + */ + public KafkaEventReporter build(String brokers, String topic) throws IOException { + this.brokers = brokers; + this.topic = topic; + return new KafkaEventReporter(this); + } + + } +}
