[GOBBLIN-421] Add parameterized type for Pusher message type Closes #2298 from htran1/pusher_message_type
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/8636b0cc Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/8636b0cc Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/8636b0cc Branch: refs/heads/0.12.0 Commit: 8636b0ccabb517c9783287a7a902e3881a878141 Parents: ca5835b Author: Hung Tran <[email protected]> Authored: Sun Mar 4 16:15:50 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Sun Mar 4 16:15:50 2018 -0800 ---------------------------------------------------------------------- .../metrics/kafka/KafkaProducerPusher.java | 2 +- .../gobblin/metrics/kafka/KafkaPusher.java | 2 +- .../metrics/kafka/KafkaProducerPusher.java | 2 +- .../gobblin/metrics/kafka/LoggingPusher.java | 68 ++++++++++++++++++++ .../gobblin/metrics/kafka/NoopPusher.java | 47 ++++++++++++++ .../apache/gobblin/metrics/kafka/Pusher.java | 4 +- .../metrics/kafka/LoggingPusherTest.java | 64 ++++++++++++++++++ .../metrics/reporter/MockKafkaPusher.java | 2 +- 8 files changed, 185 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java index ff75a92..d83cc36 100644 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java +++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java @@ -37,7 +37,7 @@ import org.apache.gobblin.util.ConfigUtils; /** * Establishes a connection to a Kafka cluster and push byte messages to a specified topic. */ -public class KafkaProducerPusher implements Pusher { +public class KafkaProducerPusher implements Pusher<byte[]> { private final String topic; private final KafkaProducer<String, byte[]> producer; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java index 1c977ff..b32899c 100644 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java +++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaPusher.java @@ -33,7 +33,7 @@ import kafka.producer.ProducerConfig; /** * Establishes a connection to a Kafka cluster and pushed byte messages to a specified topic. */ -public class KafkaPusher implements Pusher { +public class KafkaPusher implements Pusher<byte[]> { private final String topic; private final ProducerCloseable<String, byte[]> producer; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java index 3d2de9b..52d416b 100644 --- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java @@ -37,7 +37,7 @@ import org.apache.gobblin.util.ConfigUtils; /** * Establish a connection to a Kafka cluster and push byte messages to a specified topic. */ -public class KafkaProducerPusher implements Pusher { +public class KafkaProducerPusher implements Pusher<byte[]> { private final String topic; private final KafkaProducer<String, byte[]> producer; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java new file mode 100644 index 0000000..b86287e --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/LoggingPusher.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.metrics.kafka; + +import java.io.IOException; +import java.util.List; + +import com.google.common.base.Optional; +import com.typesafe.config.Config; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.util.ConfigUtils; + +import lombok.extern.slf4j.Slf4j; + +/** + * This is a {@Pusher} class that logs the messages + * @param <M> message type + */ +@Slf4j +public class LoggingPusher<M> implements Pusher<M> { + private final String brokers; + private final String topic; + private static final String KAFKA_TOPIC = "kafka.topic"; + private static final String NO_BROKERS = "NoBrokers"; + private static final String NO_TOPIC = "NoTopic"; + + public LoggingPusher() { + this(NO_BROKERS, NO_TOPIC, Optional.absent()); + } + + public LoggingPusher(Config config) { + this.brokers = ConfigUtils.getString(config, ConfigurationKeys.KAFKA_BROKERS, NO_BROKERS); + this.topic = ConfigUtils.getString(config, KAFKA_TOPIC, NO_TOPIC); + } + + /** + * Constructor like the one in KafkaProducerPusher for compatibility + */ + public LoggingPusher(String brokers, String topic, Optional<Config> kafkaConfig) { + this.brokers = brokers; + this.topic = topic; + } + + public void pushMessages(List<M> messages) { + for (M message: messages) { + log.info("Pushing to {}:{}: {}", this.brokers, this.topic, message.toString()); + } + } + + @Override + public void close() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/NoopPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/NoopPusher.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/NoopPusher.java new file mode 100644 index 0000000..2c1edd5 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/NoopPusher.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.metrics.kafka; + +import java.io.IOException; +import java.util.List; + +import com.google.common.base.Optional; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + + +/** + * This is a {@Pusher} class that ignores the messages + * @param <M> message type + */ +@Slf4j +public class NoopPusher<M> implements Pusher<M> { + public NoopPusher() {} + + public NoopPusher(Config config) {} + + /** + * Constructor like the one in KafkaProducerPusher for compatibility + */ + public NoopPusher(String brokers, String topic, Optional<Config> kafkaConfig) {} + + public void pushMessages(List<M> messages) {} + + @Override + public void close() throws IOException {} +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java index 5abd503..9024a88 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java @@ -24,10 +24,10 @@ import java.util.List; /** * Establish a connection to a Kafka cluster and push byte messages to a specified topic. */ -public interface Pusher extends Closeable { +public interface Pusher<M> extends Closeable { /** * Push all byte array messages to the Kafka topic. * @param messages List of byte array messages to push to Kakfa. */ - void pushMessages(List<byte[]> messages); + void pushMessages(List<M> messages); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java new file mode 100644 index 0000000..3e861de --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/LoggingPusherTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.metrics.kafka; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; + + +@Test +public class LoggingPusherTest { + + @Test + public void testKafkaReporter() { + + TestAppender testAppender = new TestAppender(); + Logger logger = LogManager.getLogger(LoggingPusher.class.getName()); + logger.addAppender(testAppender); + + LoggingPusher<String> loggingPusher = new LoggingPusher<String>("broker", "topic", Optional.absent()); + + loggingPusher.pushMessages(ImmutableList.of("message1", "message2")); + + Assert.assertEquals(testAppender.events.size(), 2); + Assert.assertEquals(testAppender.events.get(0).getRenderedMessage(), "Pushing to broker:topic: message1"); + Assert.assertEquals(testAppender.events.get(1).getRenderedMessage(), "Pushing to broker:topic: message2"); + + logger.removeAppender(testAppender); + } + + + private class TestAppender extends AppenderSkeleton { + List<LoggingEvent> events = new ArrayList<LoggingEvent>(); + public void close() {} + public boolean requiresLayout() {return false;} + @Override + protected void append(LoggingEvent event) { + events.add(event); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8636b0cc/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java index 71decbb..8bb827f 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java @@ -31,7 +31,7 @@ import org.apache.gobblin.metrics.kafka.Pusher; /** * Mock instance of {@link org.apache.gobblin.metrics.kafka.Pusher} used for testing. */ -public class MockKafkaPusher implements Pusher { +public class MockKafkaPusher implements Pusher<byte[]> { Queue<byte[]> messages = Queues.newLinkedBlockingQueue();
