Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 25d0a7d1a -> 10294e4b1


[GOBBLIN-640] Add a Kafka producer pusher that supports keyed messages

Closes #2510 from htran1/kafka_key_value_pusher


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/10294e4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/10294e4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/10294e4b

Branch: refs/heads/master
Commit: 10294e4b16caf28090a6641877aabb46d4c7fa2e
Parents: 25d0a7d
Author: Hung Tran <[email protected]>
Authored: Wed Nov 28 11:19:42 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Wed Nov 28 11:19:42 2018 -0800

----------------------------------------------------------------------
 .../kafka/KafkaKeyValueProducerPusher.java      | 101 +++++++++++++++++++
 .../kafka/KafkaKeyValueProducerPusher.java      | 101 +++++++++++++++++++
 .../KafkaKeyValueProducerPusherTest.java        | 101 +++++++++++++++++++
 3 files changed, 303 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/10294e4b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
new file mode 100644
index 0000000..ec930fc
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Establishes a connection to a Kafka cluster and push keyed messages to a 
specified topic.
+ * @param <K> key type
+ * @param <V> value type
+ */
+@Slf4j
+public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
+  private final String topic;
+  private final KafkaProducer<K, V> producer;
+  private final Closer closer;
+
+  public KafkaKeyValueProducerPusher(String brokers, String topic, 
Optional<Config> kafkaConfig) {
+    this.closer = Closer.create();
+
+    this.topic = topic;
+
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+    props.put(ProducerConfig.ACKS_CONFIG, "all");
+    props.put(ProducerConfig.RETRIES_CONFIG, 3);
+
+    // add the kafka scoped config. if any of the above are specified then 
they are overridden
+    if (kafkaConfig.isPresent()) {
+      props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
+    }
+
+    this.producer = createProducer(props);
+  }
+
+  public KafkaKeyValueProducerPusher(String brokers, String topic) {
+    this(brokers, topic, Optional.absent());
+  }
+
+  /**
+   * Push all keyed messages to the Kafka topic.
+   * @param messages List of keyed messages to push to Kakfa.
+   */
+  public void pushMessages(List<Pair<K, V>> messages) {
+    for (Pair<K, V> message: messages) {
+      this.producer.send(new ProducerRecord<>(topic, message.getKey(), 
message.getValue()), (recordMetadata, e) -> {
+        if (e != null) {
+          log.error("Failed to send message to topic {} due to exception: ", 
topic, e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    this.closer.close();
+  }
+
+  /**
+   * Create the Kafka producer.
+   */
+  protected KafkaProducer<K, V> createProducer(Properties props) {
+    return this.closer.register(new KafkaProducer<K, V>(props));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/10294e4b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
new file mode 100644
index 0000000..ba0e5ff
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Establishes a connection to a Kafka cluster and push keyed messages to a 
specified topic.
+ * @param <K> key type
+ * @param <V> value type
+ */
+@Slf4j
+public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
+  private final String topic;
+  private final KafkaProducer<K, V> producer;
+  private final Closer closer;
+
+  public KafkaKeyValueProducerPusher(String brokers, String topic, 
Optional<Config> kafkaConfig) {
+    this.closer = Closer.create();
+
+    this.topic = topic;
+
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+    props.put(ProducerConfig.ACKS_CONFIG, "all");
+    props.put(ProducerConfig.RETRIES_CONFIG, 3);
+
+    // add the kafka scoped config. if any of the above are specified then 
they are overridden
+    if (kafkaConfig.isPresent()) {
+      props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
+    }
+
+    this.producer = createProducer(props);
+  }
+
+  public KafkaKeyValueProducerPusher(String brokers, String topic) {
+    this(brokers, topic, Optional.absent());
+  }
+
+  /**
+   * Push all keyed messages to the Kafka topic.
+   * @param messages List of keyed messages to push to Kakfa.
+   */
+  public void pushMessages(List<Pair<K, V>> messages) {
+    for (Pair<K, V> message: messages) {
+      this.producer.send(new ProducerRecord<>(topic, message.getKey(), 
message.getValue()), (recordMetadata, e) -> {
+        if (e != null) {
+          log.error("Failed to send message to topic {} due to exception: ", 
topic, e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    this.closer.close();
+  }
+
+  /**
+   * Create the Kafka producer.
+   */
+  protected KafkaProducer<K, V> createProducer(Properties props) {
+    return this.closer.register(new KafkaProducer<K, V>(props));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/10294e4b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java
new file mode 100644
index 0000000..1681d7b
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.message.MessageAndMetadata;
+
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.metrics.kafka.KafkaKeyValueProducerPusher;
+import org.apache.gobblin.metrics.kafka.Pusher;
+
+
+/**
+ * Test {@link KafkaKeyValueProducerPusher}.
+ */
+public class KafkaKeyValueProducerPusherTest {
+  public static final String TOPIC = 
KafkaKeyValueProducerPusherTest.class.getSimpleName();
+
+  private KafkaTestBase kafkaTestHelper;
+
+  @BeforeClass
+  public void setup() throws Exception {
+    kafkaTestHelper = new KafkaTestBase();
+    kafkaTestHelper.startServers();
+
+    kafkaTestHelper.provisionTopic(TOPIC);
+  }
+
+  @Test
+  public void test() throws IOException {
+    // Test that the scoped config overrides the generic config
+    Pusher pusher = new KafkaKeyValueProducerPusher<byte[], 
byte[]>("localhost:dummy", TOPIC,
+        Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + 
this.kafkaTestHelper.getKafkaServerPort()))));
+
+    String msg1 = "msg1";
+    String msg2 = "msg2";
+
+    pusher.pushMessages(Lists.newArrayList(Pair.of("key1", msg1.getBytes()), 
Pair.of("key2", msg2.getBytes())));
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    ConsumerIterator<byte[], byte[]> iterator = 
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+    assert(iterator.hasNext());
+
+    MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+
+    Assert.assertEquals(new String(messageAndMetadata.key()), "key1");
+    Assert.assertEquals(new String(messageAndMetadata.message()), msg1);
+    assert(iterator.hasNext());
+
+    messageAndMetadata = iterator.next();
+    Assert.assertEquals(new String(messageAndMetadata.key()), "key2");
+    Assert.assertEquals(new String(messageAndMetadata.message()), msg2);
+
+    pusher.close();
+  }
+
+  @AfterClass
+  public void after() {
+    try {
+      this.kafkaTestHelper.close();
+    } catch(Exception e) {
+      System.err.println("Failed to close Kafka server.");
+    }
+  }
+}

Reply via email to