Repository: incubator-gobblin Updated Branches: refs/heads/master 25d0a7d1a -> 10294e4b1
[GOBBLIN-640] Add a Kafka producer pusher that supports keyed messages Closes #2510 from htran1/kafka_key_value_pusher Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/10294e4b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/10294e4b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/10294e4b Branch: refs/heads/master Commit: 10294e4b16caf28090a6641877aabb46d4c7fa2e Parents: 25d0a7d Author: Hung Tran <[email protected]> Authored: Wed Nov 28 11:19:42 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Wed Nov 28 11:19:42 2018 -0800 ---------------------------------------------------------------------- .../kafka/KafkaKeyValueProducerPusher.java | 101 +++++++++++++++++++ .../kafka/KafkaKeyValueProducerPusher.java | 101 +++++++++++++++++++ .../KafkaKeyValueProducerPusherTest.java | 101 +++++++++++++++++++ 3 files changed, 303 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/10294e4b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java new file mode 100644 index 0000000..ec930fc --- /dev/null +++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import com.google.common.base.Optional; +import com.google.common.io.Closer; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.util.ConfigUtils; + + +/** + * Establishes a connection to a Kafka cluster and push keyed messages to a specified topic. + * @param <K> key type + * @param <V> value type + */ +@Slf4j +public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> { + private final String topic; + private final KafkaProducer<K, V> producer; + private final Closer closer; + + public KafkaKeyValueProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) { + this.closer = Closer.create(); + + this.topic = topic; + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.RETRIES_CONFIG, 3); + + // add the kafka scoped config. if any of the above are specified then they are overridden + if (kafkaConfig.isPresent()) { + props.putAll(ConfigUtils.configToProperties(kafkaConfig.get())); + } + + this.producer = createProducer(props); + } + + public KafkaKeyValueProducerPusher(String brokers, String topic) { + this(brokers, topic, Optional.absent()); + } + + /** + * Push all keyed messages to the Kafka topic. + * @param messages List of keyed messages to push to Kakfa. + */ + public void pushMessages(List<Pair<K, V>> messages) { + for (Pair<K, V> message: messages) { + this.producer.send(new ProducerRecord<>(topic, message.getKey(), message.getValue()), (recordMetadata, e) -> { + if (e != null) { + log.error("Failed to send message to topic {} due to exception: ", topic, e); + } + }); + } + } + + @Override + public void close() + throws IOException { + this.closer.close(); + } + + /** + * Create the Kafka producer. + */ + protected KafkaProducer<K, V> createProducer(Properties props) { + return this.closer.register(new KafkaProducer<K, V>(props)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/10294e4b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java new file mode 100644 index 0000000..ba0e5ff --- /dev/null +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import com.google.common.base.Optional; +import com.google.common.io.Closer; +import com.typesafe.config.Config; + +import org.apache.gobblin.util.ConfigUtils; + +import lombok.extern.slf4j.Slf4j; + + +/** + * Establishes a connection to a Kafka cluster and push keyed messages to a specified topic. + * @param <K> key type + * @param <V> value type + */ +@Slf4j +public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> { + private final String topic; + private final KafkaProducer<K, V> producer; + private final Closer closer; + + public KafkaKeyValueProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) { + this.closer = Closer.create(); + + this.topic = topic; + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.RETRIES_CONFIG, 3); + + // add the kafka scoped config. if any of the above are specified then they are overridden + if (kafkaConfig.isPresent()) { + props.putAll(ConfigUtils.configToProperties(kafkaConfig.get())); + } + + this.producer = createProducer(props); + } + + public KafkaKeyValueProducerPusher(String brokers, String topic) { + this(brokers, topic, Optional.absent()); + } + + /** + * Push all keyed messages to the Kafka topic. + * @param messages List of keyed messages to push to Kakfa. + */ + public void pushMessages(List<Pair<K, V>> messages) { + for (Pair<K, V> message: messages) { + this.producer.send(new ProducerRecord<>(topic, message.getKey(), message.getValue()), (recordMetadata, e) -> { + if (e != null) { + log.error("Failed to send message to topic {} due to exception: ", topic, e); + } + }); + } + } + + @Override + public void close() + throws IOException { + this.closer.close(); + } + + /** + * Create the Kafka producer. + */ + protected KafkaProducer<K, V> createProducer(Properties props) { + return this.closer.register(new KafkaProducer<K, V>(props)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/10294e4b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java new file mode 100644 index 0000000..1681d7b --- /dev/null +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.reporter; + +import java.io.IOException; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.typesafe.config.ConfigFactory; + +import kafka.consumer.ConsumerIterator; +import kafka.message.MessageAndMetadata; + +import org.apache.gobblin.kafka.KafkaTestBase; +import org.apache.gobblin.metrics.kafka.KafkaKeyValueProducerPusher; +import org.apache.gobblin.metrics.kafka.Pusher; + + +/** + * Test {@link KafkaKeyValueProducerPusher}. + */ +public class KafkaKeyValueProducerPusherTest { + public static final String TOPIC = KafkaKeyValueProducerPusherTest.class.getSimpleName(); + + private KafkaTestBase kafkaTestHelper; + + @BeforeClass + public void setup() throws Exception { + kafkaTestHelper = new KafkaTestBase(); + kafkaTestHelper.startServers(); + + kafkaTestHelper.provisionTopic(TOPIC); + } + + @Test + public void test() throws IOException { + // Test that the scoped config overrides the generic config + Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("localhost:dummy", TOPIC, + Optional.of(ConfigFactory.parseMap(ImmutableMap.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort())))); + + String msg1 = "msg1"; + String msg2 = "msg2"; + + pusher.pushMessages(Lists.newArrayList(Pair.of("key1", msg1.getBytes()), Pair.of("key2", msg2.getBytes()))); + + try { + Thread.sleep(1000); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC); + + assert(iterator.hasNext()); + + MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next(); + + Assert.assertEquals(new String(messageAndMetadata.key()), "key1"); + Assert.assertEquals(new String(messageAndMetadata.message()), msg1); + assert(iterator.hasNext()); + + messageAndMetadata = iterator.next(); + Assert.assertEquals(new String(messageAndMetadata.key()), "key2"); + Assert.assertEquals(new String(messageAndMetadata.message()), msg2); + + pusher.close(); + } + + @AfterClass + public void after() { + try { + this.kafkaTestHelper.close(); + } catch(Exception e) { + System.err.println("Failed to close Kafka server."); + } + } +}
