This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a28a52  Kafka bolt (#3324)
6a28a52 is described below

commit 6a28a52aff4bb573ee8a11e9f108f4e3eeeeede2
Author: SiMing Weng <[email protected]>
AuthorDate: Thu Aug 8 13:15:12 2019 -0400

    Kafka bolt (#3324)
    
    * update KafkaSpout
    
    * Revert "update KafkaSpout"
    
    This reverts commit dc64e9ad
    
    * implement a Kafka Bolt
    
    * add unit tests for Effective Once and At Most Once mode
---
 contrib/bolts/kafka/src/java/BUILD                 |  29 +++++
 .../bolts/kafka/DefaultKafkaProducerFactory.java   |  38 ++++++
 .../org/apache/heron/bolts/kafka/KafkaBolt.java    | 106 ++++++++++++++++
 .../heron/bolts/kafka/KafkaProducerFactory.java    |  26 ++++
 .../apache/heron/bolts/kafka/TupleTransformer.java |  43 +++++++
 contrib/bolts/kafka/test/java/BUILD                |  23 ++++
 .../kafka/DefaultKafkaProducerFactoryTest.java     |  41 ++++++
 .../apache/heron/bolts/kafka/KafkaBoltTest.java    | 137 +++++++++++++++++++++
 8 files changed, 443 insertions(+)

diff --git a/contrib/bolts/kafka/src/java/BUILD 
b/contrib/bolts/kafka/src/java/BUILD
new file mode 100644
index 0000000..1df060c
--- /dev/null
+++ b/contrib/bolts/kafka/src/java/BUILD
@@ -0,0 +1,29 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+load("//tools/rules:build_defs.bzl", "DOCLINT_HTML_AND_SYNTAX")
+load("//tools/rules:javadoc.bzl", "java_doc")
+
+java_doc(
+    name = "heron-kafka-bolt-javadoc",
+    libs = [":heron-kafka-bolt-java"],
+    pkgs = ["org/apache/heron/bolts/kafka"],
+    title = "Kafka Bolt Documentation",
+)
+
+kafka_bolt_deps = [
+    "//storm-compatibility/src/java:storm-compatibility-java-neverlink",
+    "//heron/api/src/java:api-java-low-level",
+    "//heron/common/src/java:basics-java",
+    "//heron/common/src/java:config-java",
+    "//third_party/java:logging",
+    "@org_apache_kafka_kafka_clients//jar",
+]
+
+java_library(
+    name = "heron-kafka-bolt-java",
+    srcs = glob(["org/apache/heron/bolts/kafka/**/*.java"]),
+    javacopts = DOCLINT_HTML_AND_SYNTAX,
+    deps = kafka_bolt_deps,
+)
\ No newline at end of file
diff --git 
a/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactory.java
 
b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactory.java
new file mode 100644
index 0000000..3fdd223
--- /dev/null
+++ 
b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+
+@SuppressWarnings("unused")
+public class DefaultKafkaProducerFactory<K, V> implements 
KafkaProducerFactory<K, V> {
+  private static final long serialVersionUID = -2184222924531815883L;
+  private Map<String, Object> configs;
+
+  public DefaultKafkaProducerFactory(Map<String, Object> configs) {
+    this.configs = configs;
+  }
+
+  @Override
+  public Producer<K, V> create() {
+    return new KafkaProducer<>(configs);
+  }
+}
diff --git 
a/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaBolt.java 
b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaBolt.java
new file mode 100644
index 0000000..95ce3f8
--- /dev/null
+++ b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaBolt.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+
+import static org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
+
+@SuppressWarnings("unused")
+public class KafkaBolt<K, V> extends BaseRichBolt {
+  private static final long serialVersionUID = -3301619269473733618L;
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
+  private KafkaProducerFactory<K, V> kafkaProducerFactory;
+  private TupleTransformer<K, V> tupleTransformer;
+  private transient Producer<K, V> producer;
+  private Config.TopologyReliabilityMode topologyReliabilityMode;
+  private transient OutputCollector outputCollector;
+
+  @SuppressWarnings("WeakerAccess")
+  public KafkaBolt(KafkaProducerFactory<K, V> kafkaProducerFactory,
+                   TupleTransformer<K, V> tupleTransformer) {
+    this.kafkaProducerFactory = kafkaProducerFactory;
+    this.tupleTransformer = tupleTransformer;
+  }
+
+  @Override
+  public void prepare(Map<String, Object> heronConf, TopologyContext context,
+                      OutputCollector collector) {
+    topologyReliabilityMode = Config.TopologyReliabilityMode
+        .valueOf(heronConf.getOrDefault(Config.TOPOLOGY_RELIABILITY_MODE, 
ATMOST_ONCE).toString());
+    producer = kafkaProducerFactory.create();
+    outputCollector = collector;
+  }
+
+  @Override
+  public void cleanup() {
+    super.cleanup();
+    if (producer != null) {
+      producer.close();
+    }
+  }
+
+  @Override
+  public void execute(Tuple input) {
+    ProducerRecord<K, V> producerRecord = new ProducerRecord<>(
+        tupleTransformer.getTopicName(input),
+        tupleTransformer.transformToKey(input),
+        tupleTransformer.transformToValue(input));
+    if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+      try {
+        producer.send(producerRecord).get();
+      } catch (InterruptedException e) {
+        LOG.error("interrupted while waiting for the record to be sent", e);
+        Thread.currentThread().interrupt();
+      } catch (ExecutionException e) {
+        LOG.error("error has occurred when sending record to Kafka", e);
+        throw new KafkaException(e);
+      }
+    } else {
+      producer.send(producerRecord, (recordMetadata, e) -> {
+        if (e != null) {
+          LOG.error("error has occurred when sending record to Kafka", e);
+          if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+            outputCollector.fail(input);
+          }
+        } else if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+          outputCollector.ack(input);
+        }
+      });
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    //Kafka bolt does not emit anything
+  }
+}
diff --git 
a/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaProducerFactory.java
 
b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaProducerFactory.java
new file mode 100644
index 0000000..6a7ba65
--- /dev/null
+++ 
b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaProducerFactory.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.io.Serializable;
+
+import org.apache.kafka.clients.producer.Producer;
+
+public interface KafkaProducerFactory<K, V> extends Serializable {
+  Producer<K, V> create();
+}
diff --git 
a/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/TupleTransformer.java
 
b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/TupleTransformer.java
new file mode 100644
index 0000000..8611aef
--- /dev/null
+++ 
b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/TupleTransformer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.io.Serializable;
+
+import org.apache.heron.api.tuple.Tuple;
+
+public interface TupleTransformer<K, V> extends Serializable {
+
+  default K transformToKey(Tuple tuple) {
+    if (tuple.contains("key")) {
+      //noinspection unchecked
+      return (K) tuple.getValueByField("key");
+    }
+    return null;
+  }
+
+  default V transformToValue(Tuple tuple) {
+    if (tuple.contains("value")) {
+      //noinspection unchecked
+      return (V) tuple.getValueByField("value");
+    }
+    return null;
+  }
+
+  String getTopicName(Tuple tuple);
+}
diff --git a/contrib/bolts/kafka/test/java/BUILD 
b/contrib/bolts/kafka/test/java/BUILD
new file mode 100644
index 0000000..e713a6c
--- /dev/null
+++ b/contrib/bolts/kafka/test/java/BUILD
@@ -0,0 +1,23 @@
+heron_kafka_bolts_test_dep = [
+    "//contrib/bolts/kafka/src/java:heron-kafka-bolt-java",
+    "//heron/api/src/java:api-java-low-level",
+    "//heron/common/src/java:basics-java",
+    "//heron/common/src/java:config-java",
+    "//third_party/java:junit4",
+    "@org_apache_kafka_kafka_clients//jar",
+    "@org_mockito_mockito_all//jar",
+]
+
+java_test(
+    name = "KafkaBoltTest",
+    srcs = ["org/apache/heron/bolts/kafka/KafkaBoltTest.java"],
+    test_class = "org.apache.heron.bolts.kafka.KafkaBoltTest",
+    deps = heron_kafka_bolts_test_dep,
+)
+
+java_test(
+    name = "DefaultKafkaProducerFactoryTest",
+    srcs = 
["org/apache/heron/bolts/kafka/DefaultKafkaProducerFactoryTest.java"],
+    test_class = 
"org.apache.heron.bolts.kafka.DefaultKafkaProducerFactoryTest",
+    deps = heron_kafka_bolts_test_dep,
+)
diff --git 
a/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactoryTest.java
 
b/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactoryTest.java
new file mode 100644
index 0000000..18a573b
--- /dev/null
+++ 
b/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactoryTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultKafkaProducerFactoryTest {
+
+  @Test
+  public void create() {
+    Map<String, Object> configs = new HashMap<>();
+    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+    assertNotNull(new DefaultKafkaProducerFactory<>(configs).create());
+  }
+}
diff --git 
a/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/KafkaBoltTest.java 
b/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/KafkaBoltTest.java
new file mode 100644
index 0000000..9572b11
--- /dev/null
+++ 
b/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/KafkaBoltTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+
+import static org.apache.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE;
+import static org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
+import static 
org.apache.heron.api.Config.TopologyReliabilityMode.EFFECTIVELY_ONCE;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaBoltTest {
+  @Mock
+  private KafkaProducerFactory<String, byte[]> kafkaProducerFactory;
+  @Mock
+  private Producer<String, byte[]> producer;
+  @Mock
+  private TupleTransformer<String, byte[]> tupleTransformer;
+  @Mock
+  private OutputCollector outputCollector;
+  @Mock
+  private Tuple tuple;
+  @Mock
+  private Future<RecordMetadata> future;
+  private KafkaBolt<String, byte[]> kafkaBolt;
+
+  @Before
+  public void setUp() {
+    when(kafkaProducerFactory.create()).thenReturn(producer);
+    kafkaBolt = new KafkaBolt<>(kafkaProducerFactory, tupleTransformer);
+  }
+
+  @Test
+  public void cleanup() {
+    kafkaBolt.prepare(Collections.emptyMap(), null, outputCollector);
+    kafkaBolt.cleanup();
+    verify(producer).close();
+  }
+
+  @Test
+  public void executeATLEASTONCE() {
+    
kafkaBolt.prepare(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATLEAST_ONCE),
+        null, outputCollector);
+    when(tupleTransformer.transformToKey(tuple)).thenReturn("key");
+    byte[] value = new byte[]{1, 2, 3};
+    when(tupleTransformer.transformToValue(tuple)).thenReturn(value);
+    when(tupleTransformer.getTopicName(tuple)).thenReturn("topic");
+
+    ProducerRecord<String, byte[]> producerRecord = new 
ProducerRecord<>("topic", "key", value);
+    when(producer.send(eq(producerRecord), any(Callback.class)))
+        .then((Answer<Future<RecordMetadata>>) invocationOnMock -> {
+          invocationOnMock.getArgumentAt(1, Callback.class)
+              .onCompletion(new RecordMetadata(null, 0, 0, 0, null, 0, 0), 
null);
+          return future;
+        });
+    kafkaBolt.execute(tuple);
+    verify(outputCollector).ack(tuple);
+
+    when(producer.send(eq(producerRecord), any(Callback.class)))
+        .then((Answer<Future<RecordMetadata>>) invocationOnMock -> {
+          invocationOnMock.getArgumentAt(1, Callback.class)
+              .onCompletion(new RecordMetadata(null, 0, 0, 0, null, 0, 0), new 
Exception());
+          return future;
+        });
+    kafkaBolt.execute(tuple);
+    verify(outputCollector).fail(tuple);
+  }
+
+  @Test(expected = KafkaException.class)
+  public void executeEFFECTIVEONCE() throws ExecutionException, 
InterruptedException {
+    
kafkaBolt.prepare(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
EFFECTIVELY_ONCE),
+        null, outputCollector);
+    when(tupleTransformer.transformToKey(tuple)).thenReturn("key");
+    byte[] value = new byte[]{1, 2, 3};
+    when(tupleTransformer.transformToValue(tuple)).thenReturn(value);
+    when(tupleTransformer.getTopicName(tuple)).thenReturn("topic");
+
+    ProducerRecord<String, byte[]> producerRecord = new 
ProducerRecord<>("topic", "key", value);
+    when(producer.send(producerRecord)).thenReturn(future);
+    kafkaBolt.execute(tuple);
+    verify(future).get();
+
+    when(future.get()).thenThrow(ExecutionException.class);
+    kafkaBolt.execute(tuple);
+  }
+
+  @Test
+  public void executeATMOSTONCE() {
+    
kafkaBolt.prepare(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATMOST_ONCE),
+        null, outputCollector);
+    when(tupleTransformer.transformToKey(tuple)).thenReturn("key");
+    byte[] value = new byte[]{1, 2, 3};
+    when(tupleTransformer.transformToValue(tuple)).thenReturn(value);
+    when(tupleTransformer.getTopicName(tuple)).thenReturn("topic");
+
+    ProducerRecord<String, byte[]> producerRecord = new 
ProducerRecord<>("topic", "key", value);
+    kafkaBolt.execute(tuple);
+    verify(producer).send(eq(producerRecord), any(Callback.class));
+  }
+}

Reply via email to