This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new f5cbfd5 Feature/create kafka spout (#3198)
f5cbfd5 is described below
commit f5cbfd524d33bb61400a6a41e1f9a3f7ca33008d
Author: SiMing Weng <[email protected]>
AuthorDate: Mon Apr 1 13:56:42 2019 -0400
Feature/create kafka spout (#3198)
* initial commit
* remove dependency on heron-storm compatibility library
* insert Copyright header
* use constant in ConsumerConfig for client configuration
add sample topology
* change to default Kafka Broker port number
* add Javadoc documentation
* add documentation 1
* include unit tests for heron kafka spout in Travis
* add documentation 2
* support EFFECTIVE_ONCE mode
allow defining multiple output streams in ConsumerRecordTransformer
---
.travis.yml | 2 +
.../java/heron-kafka-spout-parent/doc/README.md | 62 +++
.../heron-kafka-spout-sample/pom.xml | 42 ++
.../sample/HeronKafkaSpoutSampleTopology.java | 95 +++++
.../heron-kafka-spout/pom.xml | 60 +++
.../spouts/kafka/ConsumerRecordTransformer.java | 54 +++
.../kafka/DefaultConsumerRecordTransformer.java | 22 ++
.../spouts/kafka/DefaultKafkaConsumerFactory.java | 50 +++
.../spouts/kafka/DefaultTopicPatternProvider.java | 45 +++
.../heron/spouts/kafka/KafkaConsumerFactory.java | 36 ++
.../heron/spouts/kafka/KafkaMetricDecorator.java | 38 ++
.../org/apache/heron/spouts/kafka/KafkaSpout.java | 425 +++++++++++++++++++++
.../heron/spouts/kafka/TopicPatternProvider.java | 33 ++
.../DefaultConsumerRecordTransformerTest.java | 55 +++
.../kafka/DefaultKafkaConsumerFactoryTest.java | 48 +++
.../kafka/DefaultTopicPatternProviderTest.java | 31 ++
.../spouts/kafka/KafkaMetricDecoratorTest.java | 43 +++
.../apache/heron/spouts/kafka/KafkaSpoutTest.java | 262 +++++++++++++
.../kafka/java/heron-kafka-spout-parent/pom.xml | 76 ++++
19 files changed, 1479 insertions(+)
diff --git a/.travis.yml b/.travis.yml
index 464e5b4..d9aafc7 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -49,3 +49,5 @@ script:
- which python2.7
- python2.7 -V
- scripts/travis/ci.sh
+ # run unit tests for heron kafka spout
+ - mvn -f contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml test
diff --git a/contrib/spouts/kafka/java/heron-kafka-spout-parent/doc/README.md
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/doc/README.md
new file mode 100644
index 0000000..6666e5c
--- /dev/null
+++ b/contrib/spouts/kafka/java/heron-kafka-spout-parent/doc/README.md
@@ -0,0 +1,62 @@
+# Heron Kafka Spout
+
+[Kafka
Spout](../heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java)
enables a Heron topology to consume data from Kafka cluster as input into the
stream processing pipeline. Primarily, it is written using 2 APIs, the Heron
API and Kafka Client API.
+
+##Configuring the underlying Kafka Consumer
+
+Each Kafka Spout instance creates its underlying
[Consumer](https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
instance via a factory interface
[KafkaConsumerFactory](../heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java)
that is passed in as one of the constructor arguments.
+
+The simplest way is to use the provided
[DefaultKafkaConsumerFactory](../heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java).
It takes a `Map<String, Object>` as its only input, which should contain all
the user configured properties as instituted by
[ConsumerConfig](https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
+
+_Note: `enable.auto.commit` is always set to false in
`DefaultKafkaConsumerFactory` because the Kafka Spout needs to manually manage
the committing of offset. Any custom implementation of `KafkaConsumerFactory`
should adhere to the same thing_
+
+```java
+Map<String, Object> kafkaConsumerConfig = new HashMap<>();
+//connect to Kafka broker at localhost:9092
+kafkaConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
+//group ID of the consumer group
+kafkaConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-kafka-spout");
+//key and value serializer
+kafkaConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
+kafkaConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
+
+KafkaConsumerFactory<String, String> kafkaConsumerFactory = new
DefaultKafkaConsumerFactory<>(kafkaConsumerConfig);
+```
+
+##Subscribe to topic(s)
+
+The Kafka Spout instance can be configured to subscribe either a collection of
topics by specifying the list of topic name strings in `Collection<String>`, or
it can take an implementation of
[TopicPatternProvider](../heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java)
to provide a regular expression to match all the topics that it wants to
subscribe to. There is a
[DefaultTopicPatternProvider](../heron-kafka-spout/src/main/java/org/apache/heron/spout
[...]
+
+```java
+//subscribe to specific named topic
+new KafkaSpout<>(kafkaConsumerFactory, Collections.singletonList("test-topic"))
+
+//subscribe to topics matching a pattern
+new KafkaSpout<>(kafkaConsumerFactory, new
DefaultTopicPatternProvider("test-.*"));
+```
+
+##Convert ConsumerRecord to Tuple
+
+The Spout delegates the conversion of each Kafka
[ConsumerRecord](https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
into an output tuple to the
[ConsumerRecordTransformer](../heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java),
the
[DefaultConsumerRecordTransformer](../heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java)
is provided to simply conver [...]
+
+User can create their own implementation of the `ConsumerRecordTransformer`
interface, and set it to the `KafkaSpout` via
[setConsumerRecordTransformer](../heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java)
method.
+
+##Behavior in Different Topology Reliability Mode
+
+### `ATMOST_ONCE` mode
+The whole topology will not turn the `acking` mechanism on. so, the KafkaSpout
can afford to emit the tuple without any message id, and it also immediately
commit the currently-read offset back to Kafka broker, and neither `ack()` nor
`fail()` callback will be invoked. Therefore, "in-flight" tuple will just get
lost in case the KafkaSpout instance is blown up or the topology is restarted.
That's what `ATMOST_ONCE` offers.
+
+### `ATLEAST_ONCE` mode
+The `acking` mechanism is turned on topology-wise, so the KafkaSpout uses the
`ack registry` to keep tracking all the **continuous** acknowledgement ranges
for each partition, while the `failure registry` keeps tracking the **lowest**
failed acknowledgement for each partition. When it comes to the time that the
Kafka Consumer needs to poll the Kafka cluster for more records (because it's
emitted everything it got from the previous poll), then the KafkaSpout
reconciles as following for ea [...]
+
+1. if there's any failed tuple, seek back to the lowest corresponding offset
+2. discard all the acknowledgements that it's received but is greater than the
lowest failed offset
+3. clear the lowest failed offset in `failure registry`
+4. commit the offset to be the upper boundary of the first range in the `ack
registry`
+
+then, it polls the Kafka cluster for next batch of records (i.e. from the
lowest failed tuple if any)
+
+So, it guarantees each tuple emitted by the KafkaSpout must be successfully
processed across the whole topology at least once.
+
+### `EFFECTIVE_ONCE`
+Not implemented yet
\ No newline at end of file
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/pom.xml
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/pom.xml
new file mode 100644
index 0000000..9eb1571
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2019
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>heron-kafka-spout-parent</artifactId>
+ <groupId>org.apache.heron</groupId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>heron-kafka-spout-sample</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.heron</groupId>
+ <artifactId>heron-kafka-spout</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter.heron</groupId>
+ <artifactId>heron-storm</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/org/apache/heron/spouts/kafka/sample/HeronKafkaSpoutSampleTopology.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/org/apache/heron/spouts/kafka/sample/HeronKafkaSpoutSampleTopology.java
new file mode 100644
index 0000000..9a7de40
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/org/apache/heron/spouts/kafka/sample/HeronKafkaSpoutSampleTopology.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka.sample;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.bolt.BaseRichBolt;
+import com.twitter.heron.api.bolt.OutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyBuilder;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Tuple;
+import com.twitter.heron.common.basics.ByteAmount;
+import com.twitter.heron.simulator.Simulator;
+import org.apache.heron.spouts.kafka.DefaultKafkaConsumerFactory;
+import org.apache.heron.spouts.kafka.KafkaConsumerFactory;
+import org.apache.heron.spouts.kafka.KafkaSpout;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HeronKafkaSpoutSampleTopology {
+ private static final Logger LOG =
LoggerFactory.getLogger(HeronKafkaSpoutSampleTopology.class);
+ private static final String KAFKA_SPOUT_NAME = "kafka-spout";
+ private static final String LOGGING_BOLT_NAME = "logging-bolt";
+
+ public static void main(String[] args) {
+ Map<String, Object> kafkaConsumerConfig = new HashMap<>();
+ kafkaConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
+ kafkaConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,
"sample-kafka-spout");
+ kafkaConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
+
kafkaConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
+ LOG.info("Kafka Consumer Config: {}", kafkaConsumerConfig);
+
+ KafkaConsumerFactory<String, String> kafkaConsumerFactory = new
DefaultKafkaConsumerFactory<>(kafkaConsumerConfig);
+
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ topologyBuilder.setSpout(KAFKA_SPOUT_NAME, new
KafkaSpout<>(kafkaConsumerFactory, Collections.singletonList("test-topic")));
+ topologyBuilder.setBolt(LOGGING_BOLT_NAME, new
LoggingBolt()).shuffleGrouping(KAFKA_SPOUT_NAME);
+ Config config = new Config();
+ config.setNumStmgrs(1);
+ config.setContainerCpuRequested(1);
+ config.setContainerRamRequested(ByteAmount.fromGigabytes(1));
+ config.setContainerDiskRequested(ByteAmount.fromGigabytes(1));
+
+ config.setComponentCpu(KAFKA_SPOUT_NAME, 0.25);
+ config.setComponentRam(KAFKA_SPOUT_NAME,
ByteAmount.fromMegabytes(256));
+ config.setComponentDisk(KAFKA_SPOUT_NAME,
ByteAmount.fromMegabytes(512));
+
+ config.setComponentCpu(LOGGING_BOLT_NAME, 0.25);
+ config.setComponentRam(LOGGING_BOLT_NAME,
ByteAmount.fromMegabytes(256));
+ config.setComponentDisk(LOGGING_BOLT_NAME,
ByteAmount.fromMegabytes(256));
+
+ Simulator simulator = new Simulator();
+ simulator.submitTopology("heron-kafka-spout-sample-topology", config,
topologyBuilder.createTopology());
+ }
+
+ public static class LoggingBolt extends BaseRichBolt {
+ private static final Logger LOG =
LoggerFactory.getLogger(LoggingBolt.class);
+ private transient OutputCollector outputCollector;
+
+ @Override
+ public void prepare(Map<String, Object> heronConf, TopologyContext
context, OutputCollector collector) {
+ this.outputCollector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ LOG.info("{}", input);
+ outputCollector.ack(input);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ //do nothing
+ }
+ }
+}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/pom.xml
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/pom.xml
new file mode 100644
index 0000000..2c7e072
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2019
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.heron</groupId>
+ <artifactId>heron-kafka-spout-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>heron-kafka-spout</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.twitter.heron</groupId>
+ <artifactId>heron-storm</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>5.4.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <version>5.4.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <version>2.24.5</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java
new file mode 100644
index 0000000..51b4281
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the transformer class whose responsibility is to:
+ *
+ * <ol>
+ * <li>define the id of the output streams</li>
+ * <li>declare the list of fields of the output tuple</li>
+ * <li>translate the incoming Kafka record into the list of values of the
output tuple</li>
+ * </ol>
+ * <p>
+ * The default behavior of the built-in transformer will output to stream
"default", with 2 fields, "key" and "value" which are the key and value field
of the incoming Kafka record.
+ *
+ * @param <K> the type of the key of the Kafka record
+ * @param <V> the type of the value of the Kafka record
+ * @see KafkaSpout#setConsumerRecordTransformer(ConsumerRecordTransformer)
+ */
+public interface ConsumerRecordTransformer<K, V> extends Serializable {
+ default List<String> getOutputStreams() {
+ return Collections.singletonList("default");
+ }
+
+ default List<String> getFieldNames(String streamId) {
+ return Arrays.asList("key", "value");
+ }
+
+ default Map<String, List<Object>> transform(ConsumerRecord<K, V> record) {
+ return Collections.singletonMap("default", Arrays.asList(record.key(),
record.value()));
+ }
+}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java
new file mode 100644
index 0000000..ac00b4e
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+@SuppressWarnings("WeakerAccess")
+public class DefaultConsumerRecordTransformer<K, V> implements
ConsumerRecordTransformer<K, V> {
+ private static final long serialVersionUID = -8971687732883148619L;
+}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java
new file mode 100644
index 0000000..bbc7ac5
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import java.util.Map;
+
+/**
+ * a simple Kafka Consumer factory that builds a KafkaConsumer instance from a
{@link Map} as the properties to configure it.
+ *
+ * @param <K> the type of the key of the Kafka record
+ * @param <V> the type of the value of the Kafka record
+ */
+public class DefaultKafkaConsumerFactory<K, V> implements
KafkaConsumerFactory<K, V> {
+ private static final long serialVersionUID = -2346087278604915148L;
+ private Map<String, Object> config;
+
+ /**
+ * the config map, key strings should be from {@link ConsumerConfig}
+ *
+ * @param config the configuration map
+ * @see <a
href="https://kafka.apache.org/documentation/#consumerconfigs">Kafka Consumer
Configs</a>
+ */
+ public DefaultKafkaConsumerFactory(Map<String, Object> config) {
+ this.config = config;
+ }
+
+ @Override
+ public Consumer<K, V> create() {
+ config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ return new KafkaConsumer<>(config);
+ }
+}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProvider.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProvider.java
new file mode 100644
index 0000000..b48e2ca
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import java.util.regex.Pattern;
+
+/**
+ * the built-in default pattern provider to create a topic pattern out of a
regex string
+ */
+public class DefaultTopicPatternProvider implements TopicPatternProvider {
+ private static final long serialVersionUID = 5534026856505613199L;
+ private String regex;
+
+ /**
+ * create a provider out of a regular expression string
+ *
+ * @param regex topic name regular expression
+ */
+ @SuppressWarnings("WeakerAccess")
+ public DefaultTopicPatternProvider(String regex) {
+ this.regex = regex;
+ }
+
+ @Override
+ public Pattern create() {
+ if (regex == null) {
+ throw new IllegalArgumentException("regex can not be null");
+ }
+ return Pattern.compile(regex);
+ }
+}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java
new file mode 100644
index 0000000..c67aeac
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+
+import java.io.Serializable;
+
+/**
+ * the factory to create the underlying KafkaConsumer instance the Kafka Spout
will be using to consume data from Kafka cluster
+ *
+ * @param <K> the type of the key of the Kafka record
+ * @param <V> the type of the value of the Kafka record
+ */
+public interface KafkaConsumerFactory<K, V> extends Serializable {
+ /**
+ * create the underlying KafkaConsumer
+ *
+ * @return kafka consumer instance
+ */
+ Consumer<K, V> create();
+}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaMetricDecorator.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaMetricDecorator.java
new file mode 100644
index 0000000..9363c3a
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaMetricDecorator.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import com.twitter.heron.api.metric.IMetric;
+import org.apache.kafka.common.Metric;
+
+/**
+ * a decorator to convert a Kafka Metric to a Heron Metric so that Kafka
metrics can be exposed via Heron Metrics Manager
+ *
+ * @param <M> the Kafka Metric type
+ */
+public class KafkaMetricDecorator<M extends Metric> implements IMetric<Object>
{
+ private M metric;
+
+ KafkaMetricDecorator(M metric) {
+ this.metric = metric;
+ }
+
+ @Override
+ public Object getValueAndReset() {
+ return metric.metricValue();
+ }
+}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
new file mode 100644
index 0000000..6334001
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
@@ -0,0 +1,425 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.spout.BaseRichSpout;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.state.State;
+import com.twitter.heron.api.topology.IStatefulComponent;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.common.basics.SingletonRegistry;
+import com.twitter.heron.common.config.SystemConfig;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * Kafka spout to consume data from Kafka topic(s), each record is converted
into a tuple via {@link ConsumerRecordTransformer}, and emitted into a topology
+ *
+ * @param <K> the type of the key field of the Kafka record
+ * @param <V> the type of the value field of the Kafka record
+ */
+public class KafkaSpout<K, V> extends BaseRichSpout implements
IStatefulComponent<TopicPartition, Long> {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaSpout.class);
+ private static final long serialVersionUID = -2271355516537883361L;
+ private int metricsIntervalInSecs = 60;
+ private KafkaConsumerFactory<K, V> kafkaConsumerFactory;
+ private TopicPatternProvider topicPatternProvider;
+ private Collection<String> topicNames;
+ private ConsumerRecordTransformer<K, V> consumerRecordTransformer = new
DefaultConsumerRecordTransformer<>();
+ private transient SpoutOutputCollector collector;
+ private transient TopologyContext topologyContext;
+ private transient Queue<ConsumerRecord<K, V>> buffer;
+ private transient Consumer<K, V> consumer;
+ private transient Set<TopicPartition> assignedPartitions;
+ private transient Set<MetricName> reportedMetrics;
+ private transient Map<TopicPartition, NavigableMap<Long, Long>>
ackRegistry;
+ private transient Map<TopicPartition, Long> failureRegistry;
+ private Config.TopologyReliabilityMode topologyReliabilityMode =
Config.TopologyReliabilityMode.ATMOST_ONCE;
+ private long previousKafkaMetricsUpdatedTimestamp = 0;
+ private State<TopicPartition, Long> state;
+
+ /**
+ * create a KafkaSpout instance that subscribes to a list of topics
+ *
+ * @param kafkaConsumerFactory kafka consumer factory
+ * @param topicNames list of topic names
+ */
+ public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory,
Collection<String> topicNames) {
+ this.kafkaConsumerFactory = kafkaConsumerFactory;
+ this.topicNames = topicNames;
+ }
+
+ /**
+ * create a KafkaSpout instance that subscribe to all topics matching the
topic pattern
+ *
+ * @param kafkaConsumerFactory kafka consumer factory
+ * @param topicPatternProvider provider of the topic matching pattern
+ */
+ @SuppressWarnings("WeakerAccess")
+ public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory,
TopicPatternProvider topicPatternProvider) {
+ this.kafkaConsumerFactory = kafkaConsumerFactory;
+ this.topicPatternProvider = topicPatternProvider;
+ }
+
+ /**
+ * @return the Kafka record transformer instance used by this Kafka Spout
+ */
+ @SuppressWarnings("WeakerAccess")
+ public ConsumerRecordTransformer<K, V> getConsumerRecordTransformer() {
+ return consumerRecordTransformer;
+ }
+
+ /**
+ * set the Kafka record transformer
+ *
+ * @param consumerRecordTransformer kafka record transformer
+ */
+ @SuppressWarnings("WeakerAccess")
+ public void setConsumerRecordTransformer(ConsumerRecordTransformer<K, V>
consumerRecordTransformer) {
+ this.consumerRecordTransformer = consumerRecordTransformer;
+ }
+
+ @Override
+ public void initState(State<TopicPartition, Long> state) {
+ this.state = state;
+ LOG.info("initial state {}", state);
+ }
+
+ @Override
+ public void preSave(String checkpointId) {
+ LOG.info("save state {}", state);
+ consumer.commitAsync(state.entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> new
OffsetAndMetadata(entry.getValue() + 1))), null);
+ }
+
+ @Override
+ public void open(Map<String, Object> conf, TopologyContext context,
SpoutOutputCollector collector) {
+ this.collector = collector;
+ this.topologyContext = context;
+ this.topologyReliabilityMode =
Config.TopologyReliabilityMode.valueOf(conf.get(Config.TOPOLOGY_RELIABILITY_MODE).toString());
+ metricsIntervalInSecs = (int) ((SystemConfig)
SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG)).getHeronMetricsExportInterval().getSeconds();
+ consumer = kafkaConsumerFactory.create();
+ if (topicNames != null) {
+ consumer.subscribe(topicNames, new
KafkaConsumerRebalanceListener());
+ } else {
+ consumer.subscribe(topicPatternProvider.create(), new
KafkaConsumerRebalanceListener());
+ }
+ buffer = new ArrayDeque<>(500);
+ ackRegistry = new ConcurrentHashMap<>();
+ failureRegistry = new ConcurrentHashMap<>();
+ assignedPartitions = new HashSet<>();
+ reportedMetrics = new HashSet<>();
+ }
+
+ @Override
+ public void nextTuple() {
+ ConsumerRecord<K, V> record = buffer.poll();
+ if (record != null) {
+ // there are still records remaining for emission from the
previous poll
+ emitConsumerRecord(record);
+ } else {
+ //all the records from previous poll have been emitted or this is
very first time to poll
+ if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+ ackRegistry.forEach((key, value) -> {
+ if (value != null) {
+ //seek back to the earliest failed offset if there is
any
+ rewindAndDiscardAck(key, value);
+ //commit based on the first continuous acknowledgement
range
+ manualCommit(key, value);
+ }
+ });
+ }
+ poll().forEach(kvConsumerRecord -> buffer.offer(kvConsumerRecord));
+ }
+ }
+
+ @Override
+ public void activate() {
+ super.activate();
+ if (!assignedPartitions.isEmpty()) {
+ consumer.resume(assignedPartitions);
+ }
+ }
+
+ @Override
+ public void deactivate() {
+ super.deactivate();
+ if (!assignedPartitions.isEmpty()) {
+ consumer.pause(assignedPartitions);
+ }
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ super.ack(msgId);
+ long start = System.nanoTime();
+ ConsumerRecordMessageId consumerRecordMessageId =
(ConsumerRecordMessageId) msgId;
+ TopicPartition topicPartition =
consumerRecordMessageId.getTopicPartition();
+ long offset = consumerRecordMessageId.getOffset();
+ ackRegistry.putIfAbsent(topicPartition, new ConcurrentSkipListMap<>());
+ NavigableMap<Long, Long> navigableMap =
ackRegistry.get(topicPartition);
+
+ Map.Entry<Long, Long> floorRange = navigableMap.floorEntry(offset);
+ Map.Entry<Long, Long> ceilingRange = navigableMap.ceilingEntry(offset);
+
+ long floorBottom = floorRange != null ? floorRange.getKey() :
Long.MIN_VALUE;
+ long floorTop = floorRange != null ? floorRange.getValue() :
Long.MIN_VALUE;
+ long ceilingBottom = ceilingRange != null ? ceilingRange.getKey() :
Long.MAX_VALUE;
+ long ceilingTop = ceilingRange != null ? ceilingRange.getValue() :
Long.MAX_VALUE;
+
+ /*
+ the ack is for a message that has already been acknowledged. This
happens when a failed tuple has caused
+ Kafka consumer to seek back to earlier position and some messages
are replayed.
+ */
+ if ((offset >= floorBottom && offset <= floorTop) || (offset >=
ceilingBottom && offset <= ceilingTop))
+ return;
+ if (ceilingBottom - floorTop == 2) {
+ /*
+ the ack connects the two adjacent range
+ */
+ navigableMap.put(floorBottom, ceilingTop);
+ navigableMap.remove(ceilingBottom);
+ } else if (offset == floorTop + 1) {
+ /*
+ the acknowledged offset is the immediate neighbour of the upper
bound of the floor range
+ */
+ navigableMap.put(floorBottom, offset);
+ } else if (offset == ceilingBottom - 1) {
+ /*
+ the acknowledged offset is the immediate neighbour of the lower
bound of the ceiling range
+ */
+ navigableMap.remove(ceilingBottom);
+ navigableMap.put(offset, ceilingTop);
+ } else {
+ /*
+ it is a new born range
+ */
+ navigableMap.put(offset, offset);
+ }
+ LOG.debug("ack {} in {} ns", msgId, System.nanoTime() - start);
+ LOG.debug("{}",
ackRegistry.get(consumerRecordMessageId.getTopicPartition()));
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ super.fail(msgId);
+ ConsumerRecordMessageId consumerRecordMessageId =
(ConsumerRecordMessageId) msgId;
+ TopicPartition topicPartition =
consumerRecordMessageId.getTopicPartition();
+ long offset = consumerRecordMessageId.getOffset();
+ failureRegistry.put(topicPartition,
Math.min(failureRegistry.getOrDefault(topicPartition, Long.MAX_VALUE), offset));
+ LOG.warn("fail {}", msgId);
+ }
+
+ @Override
+ public void close() {
+ consumer.close();
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ consumerRecordTransformer.getOutputStreams()
+ .forEach(s -> declarer.declareStream(s, new
Fields(consumerRecordTransformer.getFieldNames(s))));
+ }
+
+ private void emitConsumerRecord(ConsumerRecord<K, V> record) {
+ consumerRecordTransformer.transform(record)
+ .forEach((s, objects) -> {
+ if (topologyReliabilityMode !=
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+ collector.emit(s, objects);
+ //only in effective once mode, we need to track the
offset of the record that is just emitted into the topology
+ if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+ state.put(new TopicPartition(record.topic(),
record.partition()), record.offset());
+ }
+ } else {
+ //build message id based on topic, partition, offset
of the consumer record
+ ConsumerRecordMessageId consumerRecordMessageId = new
ConsumerRecordMessageId(new TopicPartition(record.topic(), record.partition()),
record.offset());
+ //emit tuple with the message id
+ collector.emit(s, objects, consumerRecordMessageId);
+ }
+ });
+ }
+
+ private void rewindAndDiscardAck(TopicPartition topicPartition,
NavigableMap<Long, Long> ackRanges) {
+ if (failureRegistry.containsKey(topicPartition)) {
+ long earliestFailedOffset = failureRegistry.get(topicPartition);
+ //rewind back to the earliest failed offset
+ consumer.seek(topicPartition, earliestFailedOffset);
+ //discard the ack whose offset is greater than the earliest failed
offset if there is any because we've rewound the consumer back
+ SortedMap<Long, Long> sortedMap =
ackRanges.headMap(earliestFailedOffset);
+ if (!sortedMap.isEmpty()) {
+ sortedMap.put(sortedMap.lastKey(),
Math.min(earliestFailedOffset, sortedMap.get(sortedMap.lastKey())));
+ }
+ ackRegistry.put(topicPartition, new
ConcurrentSkipListMap<>(sortedMap));
+ //failure for this partition has been dealt with
+ failureRegistry.remove(topicPartition);
+ }
+ }
+
+ private void manualCommit(TopicPartition topicPartition,
NavigableMap<Long, Long> ackRanges) {
+ //the first entry in the acknowledgement registry keeps track of the
lowest possible offset that can be committed
+ Map.Entry<Long, Long> firstEntry = ackRanges.firstEntry();
+ if (firstEntry != null) {
+ consumer.commitAsync(Collections.singletonMap(topicPartition, new
OffsetAndMetadata(firstEntry.getValue() + 1)), null);
+ }
+ }
+
+ private Iterable<ConsumerRecord<K, V>> poll() {
+ ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(200));
+ if (!records.isEmpty()) {
+ /*
+ since the Kafka Consumer metrics are built gradually based on the
partitions it consumes,
+ we need to periodically check whether there's any new metrics to
register after each polling.
+ */
+ if (System.currentTimeMillis() -
previousKafkaMetricsUpdatedTimestamp > metricsIntervalInSecs) {
+ registerConsumerMetrics();
+ previousKafkaMetricsUpdatedTimestamp =
System.currentTimeMillis();
+ }
+ if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.ATMOST_ONCE) {
+ consumer.commitAsync();
+ }
+ return records;
+ }
+ return Collections.emptyList();
+ }
+
+ private void registerConsumerMetrics() {
+ consumer.metrics().forEach((metricName, o) -> {
+ if (!reportedMetrics.contains(metricName)) {
+ reportedMetrics.add(metricName);
+ String exposedName = extractKafkaMetricName(metricName);
+ LOG.info("register Kakfa Consumer metric {}", exposedName);
+ topologyContext.registerMetric(exposedName, new
KafkaMetricDecorator<>(o), metricsIntervalInSecs);
+ }
+ });
+ }
+
+ private String extractKafkaMetricName(MetricName metricName) {
+ StringBuilder builder = new StringBuilder()
+ .append(metricName.name())
+ .append('-')
+ .append(metricName.group());
+ metricName.tags().forEach((s, s2) -> builder.append('-')
+ .append(s)
+ .append('-')
+ .append(s2));
+ LOG.info("register Kakfa Consumer metric {}", builder);
+ return builder.toString();
+ }
+
+ static class ConsumerRecordMessageId {
+ private TopicPartition topicPartition;
+ private long offset;
+
+ ConsumerRecordMessageId(TopicPartition topicPartition, long offset) {
+ this.topicPartition = topicPartition;
+ this.offset = offset;
+ }
+
+ @Override
+ public String toString() {
+ return "ConsumerRecordMessageId{" +
+ "topicPartition=" + topicPartition +
+ ", offset=" + offset +
+ '}';
+ }
+
+ TopicPartition getTopicPartition() {
+ return topicPartition;
+ }
+
+ long getOffset() {
+ return offset;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ConsumerRecordMessageId that = (ConsumerRecordMessageId) o;
+
+ if (offset != that.offset) return false;
+ return topicPartition.equals(that.topicPartition);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = topicPartition.hashCode();
+ result = 31 * result + (int) (offset ^ (offset >>> 32));
+ return result;
+ }
+ }
+
+ class KafkaConsumerRebalanceListener implements ConsumerRebalanceListener {
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> collection)
{
+ assignedPartitions.removeAll(collection);
+ if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+ collection.forEach(topicPartition -> {
+ NavigableMap<Long, Long> navigableMap =
ackRegistry.remove(topicPartition);
+ if (navigableMap != null) {
+ Map.Entry<Long, Long> entry =
navigableMap.firstEntry();
+ if (entry != null) {
+
consumer.commitAsync(Collections.singletonMap(topicPartition, new
OffsetAndMetadata(Math.min(failureRegistry.getOrDefault(topicPartition,
Long.MAX_VALUE), entry.getValue()) + 1)), null);
+ }
+ }
+ failureRegistry.remove(topicPartition);
+ });
+ } else if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+ collection.forEach(topicPartition ->
state.remove(topicPartition));
+ }
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
collection) {
+ assignedPartitions.addAll(collection);
+ if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+ collection.forEach(topicPartition -> {
+ try {
+ long nextRecordPosition =
consumer.position(topicPartition, Duration.ofSeconds(5));
+ ackRegistry.put(topicPartition, new
ConcurrentSkipListMap<>(Collections.singletonMap(nextRecordPosition - 1,
nextRecordPosition - 1)));
+ } catch (TimeoutException e) {
+ LOG.warn("can not get the position of the next record
to consume for partition {}", topicPartition);
+ ackRegistry.remove(topicPartition);
+ }
+ failureRegistry.remove(topicPartition);
+ });
+ } else if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+ collection.forEach(topicPartition -> {
+ if (state.containsKey(topicPartition)) {
+ consumer.seek(topicPartition,
state.get(topicPartition));
+ }
+ });
+ }
+ }
+ }
+}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java
new file mode 100644
index 0000000..1448609
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import java.io.Serializable;
+import java.util.regex.Pattern;
+
+/**
+ * convenient interface to provider a topic match pattern
+ *
+ * @see org.apache.kafka.clients.consumer.Consumer#subscribe(Pattern)
+ */
+public interface TopicPatternProvider extends Serializable {
+
+ /**
+ * @return a matching pattern for topics to subscribe to
+ */
+ Pattern create();
+}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java
new file mode 100644
index 0000000..c9d676e
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class DefaultConsumerRecordTransformerTest {
+ private static final String DEFAULT_STREAM = "default";
+ private ConsumerRecordTransformer<String, byte[]>
consumerRecordTransformer;
+
+ @BeforeEach
+ void setUp() {
+ consumerRecordTransformer = new DefaultConsumerRecordTransformer<>();
+ }
+
+ @Test
+ void getOutputStreams() {
+ assertEquals(Collections.singletonList(DEFAULT_STREAM),
consumerRecordTransformer.getOutputStreams());
+ }
+
+ @Test
+ void getFieldNames() {
+ assertEquals(Arrays.asList("key", "value"),
consumerRecordTransformer.getFieldNames(DEFAULT_STREAM));
+ }
+
+ @Test
+ void transform() {
+ ConsumerRecord<String, byte[]> consumerRecord = new
ConsumerRecord<>("partition", 0, 0, "key", new byte[]{0x1, 0x2, 0x3});
+ Map<String, List<Object>> expected =
Collections.singletonMap(DEFAULT_STREAM, Arrays.asList(consumerRecord.key(),
consumerRecord.value()));
+ assertEquals(expected,
consumerRecordTransformer.transform(consumerRecord));
+ }
+}
\ No newline at end of file
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java
new file mode 100644
index 0000000..205053c
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class DefaultKafkaConsumerFactoryTest {
+ private KafkaConsumerFactory<String, byte[]> kafkaConsumerFactory;
+
+ @BeforeEach
+ void setUp() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("bootstrap.servers", "localhost:9092");
+ config.put("group.id", "tower-kafka-spout");
+ config.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ config.put("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(config);
+ }
+
+ @Test
+ void create() {
+ try (Consumer<String, byte[]> consumer =
kafkaConsumerFactory.create()) {
+ assertTrue(consumer instanceof KafkaConsumer);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java
new file mode 100644
index 0000000..21449e4
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.regex.Pattern;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class DefaultTopicPatternProviderTest {
+
+ @Test
+ void create() {
+ assertEquals(Pattern.compile("a").pattern(), new
DefaultTopicPatternProvider("a").create().pattern());
+ }
+}
\ No newline at end of file
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java
new file mode 100644
index 0000000..d1bb516
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import org.apache.kafka.common.Metric;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class KafkaMetricDecoratorTest {
+ @Mock
+ private Metric metric;
+
+ @BeforeEach
+ void setUp() {
+ when(metric.metricValue()).thenReturn("dummy value");
+ }
+
+ @Test
+ void getValueAndReset() {
+ assertEquals("dummy value", new
KafkaMetricDecorator<>(metric).getValueAndReset());
+ }
+}
\ No newline at end of file
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
new file mode 100644
index 0000000..3fa026e
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
@@ -0,0 +1,262 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.metric.IMetric;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.common.basics.SingletonRegistry;
+import com.twitter.heron.common.config.SystemConfig;
+import com.twitter.heron.common.config.SystemConfigKey;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.*;
+import java.util.regex.Pattern;
+
+import static
com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE;
+import static com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+class KafkaSpoutTest {
+ private static final Random random = new Random();
+ private static final String DUMMY_TOPIC_NAME = "topic";
+ private KafkaSpout<String, byte[]> kafkaSpout;
+ @Mock
+ private KafkaConsumerFactory<String, byte[]> kafkaConsumerFactory;
+ @Mock
+ private Consumer<String, byte[]> consumer;
+ @Mock
+ private TopologyContext topologyContext;
+ @Mock
+ private SpoutOutputCollector collector;
+ @Mock
+ private Metric metric;
+ @Captor
+ private ArgumentCaptor<Pattern> patternArgumentCaptor;
+ @Captor
+ private ArgumentCaptor<IMetric<Object>> kafkaMetricDecoratorArgumentCaptor;
+ @Mock
+ private OutputFieldsDeclarer declarer;
+ @Captor
+ private ArgumentCaptor<Fields> fieldsArgumentCaptor;
+ @Captor
+ private ArgumentCaptor<List<Object>> listArgumentCaptor;
+ @Captor
+ private ArgumentCaptor<ConsumerRebalanceListener>
consumerRebalanceListenerArgumentCaptor;
+
+ @BeforeAll
+ static void setUpAll() {
+ if
(!SingletonRegistry.INSTANCE.containsSingleton(SystemConfig.HERON_SYSTEM_CONFIG))
{
+
SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG,
SystemConfig.newBuilder(true)
+ .put(SystemConfigKey.HERON_METRICS_EXPORT_INTERVAL, 60)
+ .build());
+ }
+ }
+
+ @BeforeEach
+ void setUp() {
+ kafkaSpout = new KafkaSpout<>(kafkaConsumerFactory,
Collections.singleton(DUMMY_TOPIC_NAME));
+ }
+
+ @Test
+ void getConsumerRecordTransformer() {
+ assertTrue(kafkaSpout.getConsumerRecordTransformer() instanceof
DefaultConsumerRecordTransformer);
+
+ }
+
+ @Test
+ void setConsumerRecordTransformer() {
+ ConsumerRecordTransformer<String, byte[]> consumerRecordTransformer =
new DefaultConsumerRecordTransformer<>();
+ kafkaSpout.setConsumerRecordTransformer(consumerRecordTransformer);
+ assertEquals(consumerRecordTransformer,
kafkaSpout.getConsumerRecordTransformer());
+ }
+
+ @Test
+ void open() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
+
+
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATMOST_ONCE.name()), topologyContext, collector);
+
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
any(KafkaSpout.KafkaConsumerRebalanceListener.class));
+
+ kafkaSpout = new KafkaSpout<>(kafkaConsumerFactory, new
DefaultTopicPatternProvider("a"));
+
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATMOST_ONCE.name()), topologyContext, collector);
+ verify(consumer).subscribe(patternArgumentCaptor.capture(),
any(KafkaSpout.KafkaConsumerRebalanceListener.class));
+ assertEquals("a", patternArgumentCaptor.getValue().pattern());
+ }
+
+ @Test
+ void nextTuple() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
+ ConsumerRecords<String, byte[]> consumerRecords = new
ConsumerRecords<>(Collections.singletonMap(new TopicPartition(DUMMY_TOPIC_NAME,
0), Collections.singletonList(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, 0,
"key", new byte[]{0xF}))));
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+ doReturn(Collections.singletonMap(new MetricName("name", "group",
"description", Collections.singletonMap("name", "value")),
metric)).when(consumer).metrics();
+ when(metric.metricValue()).thenReturn("sample value");
+
+
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATMOST_ONCE.name()), topologyContext, collector);
+
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
consumerRebalanceListenerArgumentCaptor.capture());
+ ConsumerRebalanceListener consumerRebalanceListener =
consumerRebalanceListenerArgumentCaptor.getValue();
+ TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME,
0);
+
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+
+ kafkaSpout.nextTuple();
+ verify(consumer).commitAsync();
+ verify(topologyContext).registerMetric(eq("name-group-name-value"),
kafkaMetricDecoratorArgumentCaptor.capture(), eq(60));
+ assertEquals("sample value",
kafkaMetricDecoratorArgumentCaptor.getValue().getValueAndReset());
+
+ kafkaSpout.nextTuple();
+ verify(collector).emit(eq("default"), listArgumentCaptor.capture());
+ assertEquals("key", listArgumentCaptor.getValue().get(0));
+ assertArrayEquals(new byte[]{0xF}, (byte[])
listArgumentCaptor.getValue().get(1));
+ }
+
+ @Test
+ void ack() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
+ TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME,
0);
+ List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>();
+ byte[] randomBytes = new byte[1];
+ for (int i = 0; i < 5; i++) {
+ random.nextBytes(randomBytes);
+ recordList.add(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, i, "key",
Arrays.copyOf(randomBytes, randomBytes.length)));
+ }
+ ConsumerRecords<String, byte[]> consumerRecords = new
ConsumerRecords<>(Collections.singletonMap(topicPartition, recordList));
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATLEAST_ONCE.name()), topologyContext, collector);
+ //poll the topic
+ kafkaSpout.nextTuple();
+ //emit all of the five records
+ for (int i = 0; i < 5; i++) {
+ kafkaSpout.nextTuple();
+ }
+ //ack came in out of order and the third record is not acknowledged
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
4));
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
0));
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
1));
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
3));
+ //commit and poll
+ kafkaSpout.nextTuple();
+ verify(consumer).commitAsync(Collections.singletonMap(topicPartition,
new OffsetAndMetadata(2)), null);
+ }
+
+ @Test
+ void fail() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
+ TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME,
0);
+ List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>();
+ byte[] randomBytes = new byte[1];
+ for (int i = 0; i < 5; i++) {
+ random.nextBytes(randomBytes);
+ recordList.add(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, i, "key",
Arrays.copyOf(randomBytes, randomBytes.length)));
+ }
+ ConsumerRecords<String, byte[]> consumerRecords = new
ConsumerRecords<>(Collections.singletonMap(topicPartition, recordList));
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATLEAST_ONCE.name()), topologyContext, collector);
+ //poll the topic
+ kafkaSpout.nextTuple();
+ //emit all of the five records
+ for (int i = 0; i < 5; i++) {
+ kafkaSpout.nextTuple();
+ }
+ //ack came in out of order, second and third record fails
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
4));
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
0));
+ kafkaSpout.fail(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
1));
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
3));
+ kafkaSpout.fail(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
2));
+ //commit and poll
+ kafkaSpout.nextTuple();
+ verify(consumer).seek(topicPartition, 1);
+ verify(consumer).commitAsync(Collections.singletonMap(topicPartition,
new OffsetAndMetadata(1)), null);
+ }
+
+ @Test
+ void close() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
+
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATMOST_ONCE.name()), topologyContext, collector);
+ kafkaSpout.close();
+ verify(consumer).close();
+ }
+
+ @Test
+ void declareOutputFields() {
+ kafkaSpout.declareOutputFields(declarer);
+ verify(declarer).declareStream(eq("default"),
fieldsArgumentCaptor.capture());
+ assertEquals(Arrays.asList("key", "value"),
fieldsArgumentCaptor.getValue().toList());
+ }
+
+ @Test
+ void consumerRebalanceListener() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
+
+
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATLEAST_ONCE.name()), topologyContext, collector);
+
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
consumerRebalanceListenerArgumentCaptor.capture());
+ ConsumerRebalanceListener consumerRebalanceListener =
consumerRebalanceListenerArgumentCaptor.getValue();
+ TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME,
0);
+
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+ verify(consumer).position(topicPartition, Duration.ofSeconds(5));
+
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
0));
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
1));
+
consumerRebalanceListener.onPartitionsRevoked(Collections.singleton(topicPartition));
+ verify(consumer).commitAsync(Collections.singletonMap(topicPartition,
new OffsetAndMetadata(2)), null);
+ }
+
+ @Test
+ void activate() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
+
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATMOST_ONCE.name()), topologyContext, collector);
+
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
consumerRebalanceListenerArgumentCaptor.capture());
+ ConsumerRebalanceListener consumerRebalanceListener =
consumerRebalanceListenerArgumentCaptor.getValue();
+ TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME,
0);
+
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+ kafkaSpout.activate();
+ verify(consumer).resume(Collections.singleton(topicPartition));
+ }
+
+ @Test
+ void deactivate() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
+
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATMOST_ONCE.name()), topologyContext, collector);
+
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
consumerRebalanceListenerArgumentCaptor.capture());
+ ConsumerRebalanceListener consumerRebalanceListener =
consumerRebalanceListenerArgumentCaptor.getValue();
+ TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME,
0);
+
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+ kafkaSpout.deactivate();
+ verify(consumer).pause(Collections.singleton(topicPartition));
+ }
+}
\ No newline at end of file
diff --git a/contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml
new file mode 100644
index 0000000..bd1b5e97
--- /dev/null
+++ b/contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2019
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.heron</groupId>
+ <artifactId>heron-kafka-spout-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+
+ <properties>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <modules>
+ <module>heron-kafka-spout</module>
+ <module>heron-kafka-spout-sample</module>
+ </modules>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.twitter.heron</groupId>
+ <artifactId>heron-api</artifactId>
+ <version>0.17.8</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter.heron</groupId>
+ <artifactId>heron-storm</artifactId>
+ <version>0.17.8</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>2.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <version>1.7.26</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.22.1</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+</project>
\ No newline at end of file