This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 8fb3945 Kafka spout bazel build (#3254)
8fb3945 is described below
commit 8fb39456f7e172fed70031755f05c0f7d6d5768b
Author: SiMing Weng <[email protected]>
AuthorDate: Thu May 16 00:00:34 2019 -0400
Kafka spout bazel build (#3254)
* convert the kafka spout sub project to a bazel build
* change imports to apache package name
* sort the order of dependencies
* trigger unit tests for kafka spout in Travis
* fix checkstyle errors
* fix more checkstyle errors
---
.travis.yml | 2 -
WORKSPACE | 2 +-
.../heron-kafka-spout-sample/pom.xml | 42 --
.../heron-kafka-spout-sample/src/main/java/BUILD | 18 +
.../sample/HeronKafkaSpoutSampleTopology.java | 162 +++--
.../heron-kafka-spout/pom.xml | 60 --
.../heron-kafka-spout/src/main/java/BUILD | 18 +
.../spouts/kafka/ConsumerRecordTransformer.java | 43 +-
.../kafka/DefaultConsumerRecordTransformer.java | 20 +-
.../spouts/kafka/DefaultKafkaConsumerFactory.java | 57 +-
.../spouts/kafka/DefaultTopicPatternProvider.java | 52 +-
.../heron/spouts/kafka/KafkaConsumerFactory.java | 37 +-
.../heron/spouts/kafka/KafkaMetricDecorator.java | 39 +-
.../org/apache/heron/spouts/kafka/KafkaSpout.java | 794 +++++++++++----------
.../heron/spouts/kafka/TopicPatternProvider.java | 28 +-
.../heron-kafka-spout/src/test/java/BUILD | 44 ++
.../DefaultConsumerRecordTransformerTest.java | 87 +--
.../kafka/DefaultKafkaConsumerFactoryTest.java | 66 +-
.../kafka/DefaultTopicPatternProviderTest.java | 37 +-
.../spouts/kafka/KafkaMetricDecoratorTest.java | 57 +-
.../apache/heron/spouts/kafka/KafkaSpoutTest.java | 492 +++++++------
.../kafka/java/heron-kafka-spout-parent/pom.xml | 76 --
.../eco/builder/heron/HeronStreamBuilderTest.java | 3 +-
scripts/get_all_heron_paths.sh | 2 +-
scripts/travis/build.sh | 4 +-
25 files changed, 1148 insertions(+), 1094 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index d9aafc7..464e5b4 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -49,5 +49,3 @@ script:
- which python2.7
- python2.7 -V
- scripts/travis/ci.sh
- # run unit tests for heron kafka spout
- - mvn -f contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml test
diff --git a/WORKSPACE b/WORKSPACE
index e56e40f..9172be7 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -454,7 +454,7 @@ maven_jar(
maven_jar(
name = "org_apache_kafka_kafka_clients",
- artifact = "org.apache.kafka:kafka-clients:0.8.2.1",
+ artifact = "org.apache.kafka:kafka-clients:2.2.0",
)
maven_jar(
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/pom.xml
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/pom.xml
deleted file mode 100644
index 9eb1571..0000000
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/pom.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Copyright 2019
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://maven.apache.org/POM/4.0.0"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>heron-kafka-spout-parent</artifactId>
- <groupId>org.apache.heron</groupId>
- <version>1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>heron-kafka-spout-sample</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.heron</groupId>
- <artifactId>heron-kafka-spout</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>com.twitter.heron</groupId>
- <artifactId>heron-storm</artifactId>
- </dependency>
- </dependencies>
-
-</project>
\ No newline at end of file
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/BUILD
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/BUILD
new file mode 100644
index 0000000..04d69b0
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/BUILD
@@ -0,0 +1,18 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+heron_kafka_spout_sample_dep = [
+
"//contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java:heron-kafka-spout-java",
+ "//heron/api/src/java:api-java-low-level",
+ "//heron/common/src/java:basics-java",
+ "//heron/simulator/src/java:simulator-java",
+ "@org_apache_kafka_kafka_clients//jar",
+ "@org_slf4j_slf4j_api//jar",
+]
+
+java_binary(
+ name = "heron-kafka-spout-java-sample",
+ srcs = glob(["org/apache/heron/spouts/kafka/**/*.java"]),
+ deps = heron_kafka_spout_sample_dep,
+)
\ No newline at end of file
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/org/apache/heron/spouts/kafka/sample/HeronKafkaSpoutSampleTopology.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/org/apache/heron/spouts/kafka/sample/HeronKafkaSpoutSampleTopology.java
index 9a7de40..c63a6da 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/org/apache/heron/spouts/kafka/sample/HeronKafkaSpoutSampleTopology.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/org/apache/heron/spouts/kafka/sample/HeronKafkaSpoutSampleTopology.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,80 +18,90 @@
package org.apache.heron.spouts.kafka.sample;
-import com.twitter.heron.api.Config;
-import com.twitter.heron.api.bolt.BaseRichBolt;
-import com.twitter.heron.api.bolt.OutputCollector;
-import com.twitter.heron.api.topology.OutputFieldsDeclarer;
-import com.twitter.heron.api.topology.TopologyBuilder;
-import com.twitter.heron.api.topology.TopologyContext;
-import com.twitter.heron.api.tuple.Tuple;
-import com.twitter.heron.common.basics.ByteAmount;
-import com.twitter.heron.simulator.Simulator;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.simulator.Simulator;
import org.apache.heron.spouts.kafka.DefaultKafkaConsumerFactory;
import org.apache.heron.spouts.kafka.KafkaConsumerFactory;
import org.apache.heron.spouts.kafka.KafkaSpout;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+public final class HeronKafkaSpoutSampleTopology {
+ private static final Logger LOG =
LoggerFactory.getLogger(HeronKafkaSpoutSampleTopology.class);
+ private static final String KAFKA_SPOUT_NAME = "kafka-spout";
+ private static final String LOGGING_BOLT_NAME = "logging-bolt";
+
+ private HeronKafkaSpoutSampleTopology() {
+ }
+
+ public static void main(String[] args) {
+ Map<String, Object> kafkaConsumerConfig = new HashMap<>();
+ kafkaConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
+ kafkaConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,
"sample-kafka-spout");
+ kafkaConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ LOG.info("Kafka Consumer Config: {}", kafkaConsumerConfig);
+
+ KafkaConsumerFactory<String, String> kafkaConsumerFactory =
+ new DefaultKafkaConsumerFactory<>(kafkaConsumerConfig);
+
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ topologyBuilder.setSpout(KAFKA_SPOUT_NAME, new
KafkaSpout<>(kafkaConsumerFactory,
+ Collections.singletonList("test-topic")));
+ topologyBuilder.setBolt(LOGGING_BOLT_NAME, new
LoggingBolt()).shuffleGrouping(KAFKA_SPOUT_NAME);
+ Config config = new Config();
+ config.setNumStmgrs(1);
+ config.setContainerCpuRequested(1);
+ config.setContainerRamRequested(ByteAmount.fromGigabytes(1));
+ config.setContainerDiskRequested(ByteAmount.fromGigabytes(1));
+
+ config.setComponentCpu(KAFKA_SPOUT_NAME, 0.25);
+ config.setComponentRam(KAFKA_SPOUT_NAME, ByteAmount.fromMegabytes(256));
+ config.setComponentDisk(KAFKA_SPOUT_NAME, ByteAmount.fromMegabytes(512));
+
+ config.setComponentCpu(LOGGING_BOLT_NAME, 0.25);
+ config.setComponentRam(LOGGING_BOLT_NAME, ByteAmount.fromMegabytes(256));
+ config.setComponentDisk(LOGGING_BOLT_NAME, ByteAmount.fromMegabytes(256));
+
+ Simulator simulator = new Simulator();
+ simulator.submitTopology("heron-kafka-spout-sample-topology", config,
+ topologyBuilder.createTopology());
+ }
+
+ public static class LoggingBolt extends BaseRichBolt {
+ private static final Logger LOG =
LoggerFactory.getLogger(LoggingBolt.class);
+ private transient OutputCollector outputCollector;
+
+ @Override
+ public void prepare(Map<String, Object> heronConf, TopologyContext context,
+ OutputCollector collector) {
+ this.outputCollector = collector;
+ }
-public class HeronKafkaSpoutSampleTopology {
- private static final Logger LOG =
LoggerFactory.getLogger(HeronKafkaSpoutSampleTopology.class);
- private static final String KAFKA_SPOUT_NAME = "kafka-spout";
- private static final String LOGGING_BOLT_NAME = "logging-bolt";
-
- public static void main(String[] args) {
- Map<String, Object> kafkaConsumerConfig = new HashMap<>();
- kafkaConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
- kafkaConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,
"sample-kafka-spout");
- kafkaConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
-
kafkaConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
- LOG.info("Kafka Consumer Config: {}", kafkaConsumerConfig);
-
- KafkaConsumerFactory<String, String> kafkaConsumerFactory = new
DefaultKafkaConsumerFactory<>(kafkaConsumerConfig);
-
- TopologyBuilder topologyBuilder = new TopologyBuilder();
- topologyBuilder.setSpout(KAFKA_SPOUT_NAME, new
KafkaSpout<>(kafkaConsumerFactory, Collections.singletonList("test-topic")));
- topologyBuilder.setBolt(LOGGING_BOLT_NAME, new
LoggingBolt()).shuffleGrouping(KAFKA_SPOUT_NAME);
- Config config = new Config();
- config.setNumStmgrs(1);
- config.setContainerCpuRequested(1);
- config.setContainerRamRequested(ByteAmount.fromGigabytes(1));
- config.setContainerDiskRequested(ByteAmount.fromGigabytes(1));
-
- config.setComponentCpu(KAFKA_SPOUT_NAME, 0.25);
- config.setComponentRam(KAFKA_SPOUT_NAME,
ByteAmount.fromMegabytes(256));
- config.setComponentDisk(KAFKA_SPOUT_NAME,
ByteAmount.fromMegabytes(512));
-
- config.setComponentCpu(LOGGING_BOLT_NAME, 0.25);
- config.setComponentRam(LOGGING_BOLT_NAME,
ByteAmount.fromMegabytes(256));
- config.setComponentDisk(LOGGING_BOLT_NAME,
ByteAmount.fromMegabytes(256));
-
- Simulator simulator = new Simulator();
- simulator.submitTopology("heron-kafka-spout-sample-topology", config,
topologyBuilder.createTopology());
+ @Override
+ public void execute(Tuple input) {
+ LOG.info("{}", input);
+ outputCollector.ack(input);
}
- public static class LoggingBolt extends BaseRichBolt {
- private static final Logger LOG =
LoggerFactory.getLogger(LoggingBolt.class);
- private transient OutputCollector outputCollector;
-
- @Override
- public void prepare(Map<String, Object> heronConf, TopologyContext
context, OutputCollector collector) {
- this.outputCollector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- LOG.info("{}", input);
- outputCollector.ack(input);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- //do nothing
- }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ //do nothing
}
+ }
}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/pom.xml
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/pom.xml
deleted file mode 100644
index 2c7e072..0000000
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/pom.xml
+++ /dev/null
@@ -1,60 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Copyright 2019
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://maven.apache.org/POM/4.0.0"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.heron</groupId>
- <artifactId>heron-kafka-spout-parent</artifactId>
- <version>1.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>heron-kafka-spout</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>com.twitter.heron</groupId>
- <artifactId>heron-storm</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-api</artifactId>
- <version>5.4.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
- <version>5.4.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-junit-jupiter</artifactId>
- <version>2.24.5</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
\ No newline at end of file
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/BUILD
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/BUILD
new file mode 100644
index 0000000..efa3736
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/BUILD
@@ -0,0 +1,18 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+kafka_spout_deps = [
+ "//storm-compatibility/src/java:storm-compatibility-java-neverlink",
+ "//heron/api/src/java:api-java-low-level",
+ "//heron/common/src/java:basics-java",
+ "//heron/common/src/java:config-java",
+ "//third_party/java:logging",
+ "@org_apache_kafka_kafka_clients//jar",
+]
+
+java_library(
+ name = "heron-kafka-spout-java",
+ srcs = glob(["org/apache/heron/spouts/kafka/**/*.java"]),
+ deps = kafka_spout_deps,
+)
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java
index 51b4281..cf00b2f 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,14 +18,14 @@
package org.apache.heron.spouts.kafka;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
/**
* This is the transformer class whose responsibility is to:
*
@@ -33,22 +35,23 @@ import java.util.Map;
* <li>translate the incoming Kafka record into the list of values of the
output tuple</li>
* </ol>
* <p>
- * The default behavior of the built-in transformer will output to stream
"default", with 2 fields, "key" and "value" which are the key and value field
of the incoming Kafka record.
+ * The default behavior of the built-in transformer will output to stream
"default",
+ * with 2 fields, "key" and "value" which are the key and value field of the
incoming Kafka record.
*
* @param <K> the type of the key of the Kafka record
* @param <V> the type of the value of the Kafka record
* @see KafkaSpout#setConsumerRecordTransformer(ConsumerRecordTransformer)
*/
public interface ConsumerRecordTransformer<K, V> extends Serializable {
- default List<String> getOutputStreams() {
- return Collections.singletonList("default");
- }
+ default List<String> getOutputStreams() {
+ return Collections.singletonList("default");
+ }
- default List<String> getFieldNames(String streamId) {
- return Arrays.asList("key", "value");
- }
+ default List<String> getFieldNames(String streamId) {
+ return Arrays.asList("key", "value");
+ }
- default Map<String, List<Object>> transform(ConsumerRecord<K, V> record) {
- return Collections.singletonMap("default", Arrays.asList(record.key(),
record.value()));
- }
+ default Map<String, List<Object>> transform(ConsumerRecord<K, V> record) {
+ return Collections.singletonMap("default", Arrays.asList(record.key(),
record.value()));
+ }
}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java
index ac00b4e..7202769 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,5 +20,5 @@ package org.apache.heron.spouts.kafka;
@SuppressWarnings("WeakerAccess")
public class DefaultConsumerRecordTransformer<K, V> implements
ConsumerRecordTransformer<K, V> {
- private static final long serialVersionUID = -8971687732883148619L;
+ private static final long serialVersionUID = -8971687732883148619L;
}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java
index bbc7ac5..1178352 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,35 +18,36 @@
package org.apache.heron.spouts.kafka;
+import java.util.Map;
+
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import java.util.Map;
-
/**
- * a simple Kafka Consumer factory that builds a KafkaConsumer instance from a
{@link Map} as the properties to configure it.
+ * a simple Kafka Consumer factory that builds a KafkaConsumer instance from a
{@link Map} as
+ * the properties to configure it.
*
* @param <K> the type of the key of the Kafka record
* @param <V> the type of the value of the Kafka record
*/
public class DefaultKafkaConsumerFactory<K, V> implements
KafkaConsumerFactory<K, V> {
- private static final long serialVersionUID = -2346087278604915148L;
- private Map<String, Object> config;
+ private static final long serialVersionUID = -2346087278604915148L;
+ private Map<String, Object> config;
- /**
- * the config map, key strings should be from {@link ConsumerConfig}
- *
- * @param config the configuration map
- * @see <a
href="https://kafka.apache.org/documentation/#consumerconfigs">Kafka Consumer
Configs</a>
- */
- public DefaultKafkaConsumerFactory(Map<String, Object> config) {
- this.config = config;
- }
+ /**
+ * the config map, key strings should be from {@link ConsumerConfig}
+ *
+ * @param config the configuration map
+ * @see <a
href="https://kafka.apache.org/documentation/#consumerconfigs">Kafka Consumer
Configs</a>
+ */
+ public DefaultKafkaConsumerFactory(Map<String, Object> config) {
+ this.config = config;
+ }
- @Override
- public Consumer<K, V> create() {
- config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- return new KafkaConsumer<>(config);
- }
+ @Override
+ public Consumer<K, V> create() {
+ config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ return new KafkaConsumer<>(config);
+ }
}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProvider.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProvider.java
index b48e2ca..bf0de6e 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProvider.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProvider.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,24 +24,24 @@ import java.util.regex.Pattern;
* the built-in default pattern provider to create a topic pattern out of a
regex string
*/
public class DefaultTopicPatternProvider implements TopicPatternProvider {
- private static final long serialVersionUID = 5534026856505613199L;
- private String regex;
+ private static final long serialVersionUID = 5534026856505613199L;
+ private String regex;
- /**
- * create a provider out of a regular expression string
- *
- * @param regex topic name regular expression
- */
- @SuppressWarnings("WeakerAccess")
- public DefaultTopicPatternProvider(String regex) {
- this.regex = regex;
- }
+ /**
+ * create a provider out of a regular expression string
+ *
+ * @param regex topic name regular expression
+ */
+ @SuppressWarnings("WeakerAccess")
+ public DefaultTopicPatternProvider(String regex) {
+ this.regex = regex;
+ }
- @Override
- public Pattern create() {
- if (regex == null) {
- throw new IllegalArgumentException("regex can not be null");
- }
- return Pattern.compile(regex);
+ @Override
+ public Pattern create() {
+ if (regex == null) {
+ throw new IllegalArgumentException("regex can not be null");
}
+ return Pattern.compile(regex);
+ }
}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java
index c67aeac..6ac2d2e 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,21 +18,22 @@
package org.apache.heron.spouts.kafka;
-import org.apache.kafka.clients.consumer.Consumer;
-
import java.io.Serializable;
+import org.apache.kafka.clients.consumer.Consumer;
+
/**
- * the factory to create the underlying KafkaConsumer instance the Kafka Spout
will be using to consume data from Kafka cluster
+ * the factory to create the underlying KafkaConsumer instance the Kafka Spout
+ * will be using to consume data from Kafka cluster
*
* @param <K> the type of the key of the Kafka record
* @param <V> the type of the value of the Kafka record
*/
public interface KafkaConsumerFactory<K, V> extends Serializable {
- /**
- * create the underlying KafkaConsumer
- *
- * @return kafka consumer instance
- */
- Consumer<K, V> create();
+ /**
+ * create the underlying KafkaConsumer
+ *
+ * @return kafka consumer instance
+ */
+ Consumer<K, V> create();
}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaMetricDecorator.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaMetricDecorator.java
index 9363c3a..55f718f 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaMetricDecorator.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaMetricDecorator.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,23 +18,24 @@
package org.apache.heron.spouts.kafka;
-import com.twitter.heron.api.metric.IMetric;
+import org.apache.heron.api.metric.IMetric;
import org.apache.kafka.common.Metric;
/**
- * a decorator to convert a Kafka Metric to a Heron Metric so that Kafka
metrics can be exposed via Heron Metrics Manager
+ * a decorator to convert a Kafka Metric to a Heron Metric so that Kafka
+ * metrics can be exposed via Heron Metrics Manager
*
* @param <M> the Kafka Metric type
*/
public class KafkaMetricDecorator<M extends Metric> implements IMetric<Object>
{
- private M metric;
+ private M metric;
- KafkaMetricDecorator(M metric) {
- this.metric = metric;
- }
+ KafkaMetricDecorator(M metric) {
+ this.metric = metric;
+ }
- @Override
- public Object getValueAndReset() {
- return metric.metricValue();
- }
+ @Override
+ public Object getValueAndReset() {
+ return metric.metricValue();
+ }
}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
index 6334001..bc13d0a 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,410 +18,458 @@
package org.apache.heron.spouts.kafka;
-import com.twitter.heron.api.Config;
-import com.twitter.heron.api.spout.BaseRichSpout;
-import com.twitter.heron.api.spout.SpoutOutputCollector;
-import com.twitter.heron.api.state.State;
-import com.twitter.heron.api.topology.IStatefulComponent;
-import com.twitter.heron.api.topology.OutputFieldsDeclarer;
-import com.twitter.heron.api.topology.TopologyContext;
-import com.twitter.heron.api.tuple.Fields;
-import com.twitter.heron.common.basics.SingletonRegistry;
-import com.twitter.heron.common.config.SystemConfig;
-import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.time.Duration;
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.common.basics.SingletonRegistry;
+import org.apache.heron.common.config.SystemConfig;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+
/**
* Kafka spout to consume data from Kafka topic(s), each record is converted
into a tuple via {@link ConsumerRecordTransformer}, and emitted into a topology
*
* @param <K> the type of the key field of the Kafka record
* @param <V> the type of the value field of the Kafka record
*/
-public class KafkaSpout<K, V> extends BaseRichSpout implements
IStatefulComponent<TopicPartition, Long> {
- private static final Logger LOG =
LoggerFactory.getLogger(KafkaSpout.class);
- private static final long serialVersionUID = -2271355516537883361L;
- private int metricsIntervalInSecs = 60;
- private KafkaConsumerFactory<K, V> kafkaConsumerFactory;
- private TopicPatternProvider topicPatternProvider;
- private Collection<String> topicNames;
- private ConsumerRecordTransformer<K, V> consumerRecordTransformer = new
DefaultConsumerRecordTransformer<>();
- private transient SpoutOutputCollector collector;
- private transient TopologyContext topologyContext;
- private transient Queue<ConsumerRecord<K, V>> buffer;
- private transient Consumer<K, V> consumer;
- private transient Set<TopicPartition> assignedPartitions;
- private transient Set<MetricName> reportedMetrics;
- private transient Map<TopicPartition, NavigableMap<Long, Long>>
ackRegistry;
- private transient Map<TopicPartition, Long> failureRegistry;
- private Config.TopologyReliabilityMode topologyReliabilityMode =
Config.TopologyReliabilityMode.ATMOST_ONCE;
- private long previousKafkaMetricsUpdatedTimestamp = 0;
- private State<TopicPartition, Long> state;
-
- /**
- * create a KafkaSpout instance that subscribes to a list of topics
- *
- * @param kafkaConsumerFactory kafka consumer factory
- * @param topicNames list of topic names
- */
- public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory,
Collection<String> topicNames) {
- this.kafkaConsumerFactory = kafkaConsumerFactory;
- this.topicNames = topicNames;
+public class KafkaSpout<K, V> extends BaseRichSpout
+ implements IStatefulComponent<TopicPartition, Long> {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
+ private static final long serialVersionUID = -2271355516537883361L;
+ private int metricsIntervalInSecs = 60;
+ private KafkaConsumerFactory<K, V> kafkaConsumerFactory;
+ private TopicPatternProvider topicPatternProvider;
+ private Collection<String> topicNames;
+ private ConsumerRecordTransformer<K, V> consumerRecordTransformer =
+ new DefaultConsumerRecordTransformer<>();
+ private transient SpoutOutputCollector collector;
+ private transient TopologyContext topologyContext;
+ private transient Queue<ConsumerRecord<K, V>> buffer;
+ private transient Consumer<K, V> consumer;
+ private transient Set<TopicPartition> assignedPartitions;
+ private transient Set<MetricName> reportedMetrics;
+ private transient Map<TopicPartition, NavigableMap<Long, Long>> ackRegistry;
+ private transient Map<TopicPartition, Long> failureRegistry;
+ private Config.TopologyReliabilityMode topologyReliabilityMode =
+ Config.TopologyReliabilityMode.ATMOST_ONCE;
+ private long previousKafkaMetricsUpdatedTimestamp = 0;
+ private State<TopicPartition, Long> state;
+
+ /**
+ * create a KafkaSpout instance that subscribes to a list of topics
+ *
+ * @param kafkaConsumerFactory kafka consumer factory
+ * @param topicNames list of topic names
+ */
+ public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory,
+ Collection<String> topicNames) {
+ this.kafkaConsumerFactory = kafkaConsumerFactory;
+ this.topicNames = topicNames;
+ }
+
+ /**
+ * create a KafkaSpout instance that subscribe to all topics matching the
topic pattern
+ *
+ * @param kafkaConsumerFactory kafka consumer factory
+ * @param topicPatternProvider provider of the topic matching pattern
+ */
+ @SuppressWarnings("WeakerAccess")
+ public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory,
+ TopicPatternProvider topicPatternProvider) {
+ this.kafkaConsumerFactory = kafkaConsumerFactory;
+ this.topicPatternProvider = topicPatternProvider;
+ }
+
+ /**
+ * return the consumer record transformer
+ *
+ * @return the Kafka record transformer instance used by this Kafka Spout
+ */
+ @SuppressWarnings("WeakerAccess")
+ public ConsumerRecordTransformer<K, V> getConsumerRecordTransformer() {
+ return consumerRecordTransformer;
+ }
+
+ /**
+ * set the Kafka record transformer
+ *
+ * @param consumerRecordTransformer kafka record transformer
+ */
+ @SuppressWarnings("WeakerAccess")
+ public void setConsumerRecordTransformer(ConsumerRecordTransformer<K, V>
+ consumerRecordTransformer) {
+ this.consumerRecordTransformer = consumerRecordTransformer;
+ }
+
+ @Override
+ public void initState(State<TopicPartition, Long> aState) {
+ this.state = aState;
+ LOG.info("initial state {}", aState);
+ }
+
+ @Override
+ public void preSave(String checkpointId) {
+ LOG.info("save state {}", state);
+ consumer.commitAsync(state.entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry ->
+ new OffsetAndMetadata(entry.getValue() + 1))), null);
+ }
+
+ @Override
+ public void open(Map<String, Object> conf, TopologyContext context,
+ SpoutOutputCollector aCollector) {
+ this.collector = aCollector;
+ this.topologyContext = context;
+ this.topologyReliabilityMode = Config.TopologyReliabilityMode.valueOf(
+ conf.get(Config.TOPOLOGY_RELIABILITY_MODE).toString());
+ metricsIntervalInSecs = (int) ((SystemConfig) SingletonRegistry.INSTANCE
+ .getSingleton(SystemConfig.HERON_SYSTEM_CONFIG))
+ .getHeronMetricsExportInterval().getSeconds();
+ consumer = kafkaConsumerFactory.create();
+ if (topicNames != null) {
+ consumer.subscribe(topicNames, new KafkaConsumerRebalanceListener());
+ } else {
+ consumer.subscribe(topicPatternProvider.create(), new
KafkaConsumerRebalanceListener());
}
-
- /**
- * create a KafkaSpout instance that subscribe to all topics matching the
topic pattern
- *
- * @param kafkaConsumerFactory kafka consumer factory
- * @param topicPatternProvider provider of the topic matching pattern
- */
- @SuppressWarnings("WeakerAccess")
- public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory,
TopicPatternProvider topicPatternProvider) {
- this.kafkaConsumerFactory = kafkaConsumerFactory;
- this.topicPatternProvider = topicPatternProvider;
- }
-
- /**
- * @return the Kafka record transformer instance used by this Kafka Spout
- */
- @SuppressWarnings("WeakerAccess")
- public ConsumerRecordTransformer<K, V> getConsumerRecordTransformer() {
- return consumerRecordTransformer;
+ buffer = new ArrayDeque<>(500);
+ ackRegistry = new ConcurrentHashMap<>();
+ failureRegistry = new ConcurrentHashMap<>();
+ assignedPartitions = new HashSet<>();
+ reportedMetrics = new HashSet<>();
+ }
+
+ @Override
+ public void nextTuple() {
+ ConsumerRecord<K, V> record = buffer.poll();
+ if (record != null) {
+ // there are still records remaining for emission from the previous poll
+ emitConsumerRecord(record);
+ } else {
+ //all the records from previous poll have been
+ //emitted or this is very first time to poll
+ if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+ ackRegistry.forEach((key, value) -> {
+ if (value != null) {
+ //seek back to the earliest failed offset if there is any
+ rewindAndDiscardAck(key, value);
+ //commit based on the first continuous acknowledgement range
+ manualCommit(key, value);
+ }
+ });
+ }
+ poll().forEach(kvConsumerRecord -> buffer.offer(kvConsumerRecord));
}
+ }
- /**
- * set the Kafka record transformer
- *
- * @param consumerRecordTransformer kafka record transformer
- */
- @SuppressWarnings("WeakerAccess")
- public void setConsumerRecordTransformer(ConsumerRecordTransformer<K, V>
consumerRecordTransformer) {
- this.consumerRecordTransformer = consumerRecordTransformer;
+ @Override
+ public void activate() {
+ super.activate();
+ if (!assignedPartitions.isEmpty()) {
+ consumer.resume(assignedPartitions);
}
+ }
- @Override
- public void initState(State<TopicPartition, Long> state) {
- this.state = state;
- LOG.info("initial state {}", state);
+ @Override
+ public void deactivate() {
+ super.deactivate();
+ if (!assignedPartitions.isEmpty()) {
+ consumer.pause(assignedPartitions);
}
-
- @Override
- public void preSave(String checkpointId) {
- LOG.info("save state {}", state);
- consumer.commitAsync(state.entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey, entry -> new
OffsetAndMetadata(entry.getValue() + 1))), null);
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ super.ack(msgId);
+ long start = System.nanoTime();
+ ConsumerRecordMessageId consumerRecordMessageId =
(ConsumerRecordMessageId) msgId;
+ TopicPartition topicPartition =
consumerRecordMessageId.getTopicPartition();
+ long offset = consumerRecordMessageId.getOffset();
+ ackRegistry.putIfAbsent(topicPartition, new ConcurrentSkipListMap<>());
+ NavigableMap<Long, Long> navigableMap = ackRegistry.get(topicPartition);
+
+ Map.Entry<Long, Long> floorRange = navigableMap.floorEntry(offset);
+ Map.Entry<Long, Long> ceilingRange = navigableMap.ceilingEntry(offset);
+
+ long floorBottom = floorRange != null ? floorRange.getKey() :
Long.MIN_VALUE;
+ long floorTop = floorRange != null ? floorRange.getValue() :
Long.MIN_VALUE;
+ long ceilingBottom = ceilingRange != null ? ceilingRange.getKey() :
Long.MAX_VALUE;
+ long ceilingTop = ceilingRange != null ? ceilingRange.getValue() :
Long.MAX_VALUE;
+
+ //the ack is for a message that has already been acknowledged.
+ //This happens when a failed tuple has caused
+ //Kafka consumer to seek back to earlier position and some messages are
replayed.
+ if ((offset >= floorBottom && offset <= floorTop)
+ || (offset >= ceilingBottom && offset <= ceilingTop)) {
+ return;
}
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext context,
SpoutOutputCollector collector) {
- this.collector = collector;
- this.topologyContext = context;
- this.topologyReliabilityMode =
Config.TopologyReliabilityMode.valueOf(conf.get(Config.TOPOLOGY_RELIABILITY_MODE).toString());
- metricsIntervalInSecs = (int) ((SystemConfig)
SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG)).getHeronMetricsExportInterval().getSeconds();
- consumer = kafkaConsumerFactory.create();
- if (topicNames != null) {
- consumer.subscribe(topicNames, new
KafkaConsumerRebalanceListener());
- } else {
- consumer.subscribe(topicPatternProvider.create(), new
KafkaConsumerRebalanceListener());
- }
- buffer = new ArrayDeque<>(500);
- ackRegistry = new ConcurrentHashMap<>();
- failureRegistry = new ConcurrentHashMap<>();
- assignedPartitions = new HashSet<>();
- reportedMetrics = new HashSet<>();
+ if (ceilingBottom - floorTop == 2) {
+ //the ack connects the two adjacent range
+ navigableMap.put(floorBottom, ceilingTop);
+ navigableMap.remove(ceilingBottom);
+ } else if (offset == floorTop + 1) {
+ //the acknowledged offset is the immediate neighbour
+ // of the upper bound of the floor range
+ navigableMap.put(floorBottom, offset);
+ } else if (offset == ceilingBottom - 1) {
+ //the acknowledged offset is the immediate neighbour
+ // of the lower bound of the ceiling range
+ navigableMap.remove(ceilingBottom);
+ navigableMap.put(offset, ceilingTop);
+ } else {
+ //it is a new born range
+ navigableMap.put(offset, offset);
}
-
- @Override
- public void nextTuple() {
- ConsumerRecord<K, V> record = buffer.poll();
- if (record != null) {
- // there are still records remaining for emission from the
previous poll
- emitConsumerRecord(record);
- } else {
- //all the records from previous poll have been emitted or this is
very first time to poll
- if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
- ackRegistry.forEach((key, value) -> {
- if (value != null) {
- //seek back to the earliest failed offset if there is
any
- rewindAndDiscardAck(key, value);
- //commit based on the first continuous acknowledgement
range
- manualCommit(key, value);
- }
- });
+ LOG.debug("ack {} in {} ns", msgId, System.nanoTime() - start);
+ LOG.debug("{}",
ackRegistry.get(consumerRecordMessageId.getTopicPartition()));
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ super.fail(msgId);
+ ConsumerRecordMessageId consumerRecordMessageId =
(ConsumerRecordMessageId) msgId;
+ TopicPartition topicPartition =
consumerRecordMessageId.getTopicPartition();
+ long offset = consumerRecordMessageId.getOffset();
+ failureRegistry.put(topicPartition,
Math.min(failureRegistry.getOrDefault(topicPartition,
+ Long.MAX_VALUE), offset));
+ LOG.warn("fail {}", msgId);
+ }
+
+ @Override
+ public void close() {
+ consumer.close();
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ consumerRecordTransformer.getOutputStreams()
+ .forEach(s -> declarer.declareStream(s,
+ new Fields(consumerRecordTransformer.getFieldNames(s))));
+ }
+
+ private void emitConsumerRecord(ConsumerRecord<K, V> record) {
+ consumerRecordTransformer.transform(record)
+ .forEach((s, objects) -> {
+ if (topologyReliabilityMode !=
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+ collector.emit(s, objects);
+ //only in effective once mode, we need to track the offset of the
record that is just
+ //emitted into the topology
+ if (topologyReliabilityMode
+ == Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+ state.put(new TopicPartition(record.topic(), record.partition()),
+ record.offset());
}
- poll().forEach(kvConsumerRecord -> buffer.offer(kvConsumerRecord));
- }
+ } else {
+ //build message id based on topic, partition, offset of the
consumer record
+ ConsumerRecordMessageId consumerRecordMessageId =
+ new ConsumerRecordMessageId(new TopicPartition(record.topic(),
+ record.partition()), record.offset());
+ //emit tuple with the message id
+ collector.emit(s, objects, consumerRecordMessageId);
+ }
+ });
+ }
+
+ private void rewindAndDiscardAck(TopicPartition topicPartition,
+ NavigableMap<Long, Long> ackRanges) {
+ if (failureRegistry.containsKey(topicPartition)) {
+ long earliestFailedOffset = failureRegistry.get(topicPartition);
+ //rewind back to the earliest failed offset
+ consumer.seek(topicPartition, earliestFailedOffset);
+ //discard the ack whose offset is greater than the earliest failed
offset if there
+ //is any because we've rewound the consumer back
+ SortedMap<Long, Long> sortedMap =
ackRanges.headMap(earliestFailedOffset);
+ if (!sortedMap.isEmpty()) {
+ sortedMap.put(sortedMap.lastKey(), Math.min(earliestFailedOffset,
+ sortedMap.get(sortedMap.lastKey())));
+ }
+ ackRegistry.put(topicPartition, new ConcurrentSkipListMap<>(sortedMap));
+ //failure for this partition has been dealt with
+ failureRegistry.remove(topicPartition);
}
-
- @Override
- public void activate() {
- super.activate();
- if (!assignedPartitions.isEmpty()) {
- consumer.resume(assignedPartitions);
- }
+ }
+
+ private void manualCommit(TopicPartition topicPartition, NavigableMap<Long,
Long> ackRanges) {
+ //the first entry in the acknowledgement registry keeps track of the
lowest possible
+ //offset that can be committed
+ Map.Entry<Long, Long> firstEntry = ackRanges.firstEntry();
+ if (firstEntry != null) {
+ consumer.commitAsync(Collections.singletonMap(topicPartition,
+ new OffsetAndMetadata(firstEntry.getValue() + 1)), null);
}
-
- @Override
- public void deactivate() {
- super.deactivate();
- if (!assignedPartitions.isEmpty()) {
- consumer.pause(assignedPartitions);
- }
+ }
+
+ private Iterable<ConsumerRecord<K, V>> poll() {
+ ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(200));
+ if (!records.isEmpty()) {
+ //since the Kafka Consumer metrics are built gradually based on the
partitions it consumes,
+ //we need to periodically check whether there's any new metrics to
register after
+ //each polling.
+ if (System.currentTimeMillis() - previousKafkaMetricsUpdatedTimestamp
+ > metricsIntervalInSecs) {
+ registerConsumerMetrics();
+ previousKafkaMetricsUpdatedTimestamp = System.currentTimeMillis();
+ }
+ if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.ATMOST_ONCE) {
+ consumer.commitAsync();
+ }
+ return records;
}
-
- @Override
- public void ack(Object msgId) {
- super.ack(msgId);
- long start = System.nanoTime();
- ConsumerRecordMessageId consumerRecordMessageId =
(ConsumerRecordMessageId) msgId;
- TopicPartition topicPartition =
consumerRecordMessageId.getTopicPartition();
- long offset = consumerRecordMessageId.getOffset();
- ackRegistry.putIfAbsent(topicPartition, new ConcurrentSkipListMap<>());
- NavigableMap<Long, Long> navigableMap =
ackRegistry.get(topicPartition);
-
- Map.Entry<Long, Long> floorRange = navigableMap.floorEntry(offset);
- Map.Entry<Long, Long> ceilingRange = navigableMap.ceilingEntry(offset);
-
- long floorBottom = floorRange != null ? floorRange.getKey() :
Long.MIN_VALUE;
- long floorTop = floorRange != null ? floorRange.getValue() :
Long.MIN_VALUE;
- long ceilingBottom = ceilingRange != null ? ceilingRange.getKey() :
Long.MAX_VALUE;
- long ceilingTop = ceilingRange != null ? ceilingRange.getValue() :
Long.MAX_VALUE;
-
- /*
- the ack is for a message that has already been acknowledged. This
happens when a failed tuple has caused
- Kafka consumer to seek back to earlier position and some messages
are replayed.
- */
- if ((offset >= floorBottom && offset <= floorTop) || (offset >=
ceilingBottom && offset <= ceilingTop))
- return;
- if (ceilingBottom - floorTop == 2) {
- /*
- the ack connects the two adjacent range
- */
- navigableMap.put(floorBottom, ceilingTop);
- navigableMap.remove(ceilingBottom);
- } else if (offset == floorTop + 1) {
- /*
- the acknowledged offset is the immediate neighbour of the upper
bound of the floor range
- */
- navigableMap.put(floorBottom, offset);
- } else if (offset == ceilingBottom - 1) {
- /*
- the acknowledged offset is the immediate neighbour of the lower
bound of the ceiling range
- */
- navigableMap.remove(ceilingBottom);
- navigableMap.put(offset, ceilingTop);
- } else {
- /*
- it is a new born range
- */
- navigableMap.put(offset, offset);
- }
- LOG.debug("ack {} in {} ns", msgId, System.nanoTime() - start);
- LOG.debug("{}",
ackRegistry.get(consumerRecordMessageId.getTopicPartition()));
+ return Collections.emptyList();
+ }
+
+ private void registerConsumerMetrics() {
+ consumer.metrics().forEach((metricName, o) -> {
+ if (!reportedMetrics.contains(metricName)) {
+ reportedMetrics.add(metricName);
+ String exposedName = extractKafkaMetricName(metricName);
+ LOG.info("register Kakfa Consumer metric {}", exposedName);
+ topologyContext.registerMetric(exposedName, new
KafkaMetricDecorator<>(o),
+ metricsIntervalInSecs);
+ }
+ });
+ }
+
+ private String extractKafkaMetricName(MetricName metricName) {
+ StringBuilder builder = new StringBuilder()
+ .append(metricName.name())
+ .append('-')
+ .append(metricName.group());
+ metricName.tags().forEach((s, s2) -> builder.append('-')
+ .append(s)
+ .append('-')
+ .append(s2));
+ LOG.info("register Kakfa Consumer metric {}", builder);
+ return builder.toString();
+ }
+
+ static class ConsumerRecordMessageId {
+ private TopicPartition topicPartition;
+ private long offset;
+
+ ConsumerRecordMessageId(TopicPartition topicPartition, long offset) {
+ this.topicPartition = topicPartition;
+ this.offset = offset;
}
@Override
- public void fail(Object msgId) {
- super.fail(msgId);
- ConsumerRecordMessageId consumerRecordMessageId =
(ConsumerRecordMessageId) msgId;
- TopicPartition topicPartition =
consumerRecordMessageId.getTopicPartition();
- long offset = consumerRecordMessageId.getOffset();
- failureRegistry.put(topicPartition,
Math.min(failureRegistry.getOrDefault(topicPartition, Long.MAX_VALUE), offset));
- LOG.warn("fail {}", msgId);
+ public String toString() {
+ return "ConsumerRecordMessageId{"
+ + "topicPartition=" + topicPartition
+ + ", offset=" + offset
+ + '}';
}
- @Override
- public void close() {
- consumer.close();
+ TopicPartition getTopicPartition() {
+ return topicPartition;
}
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- consumerRecordTransformer.getOutputStreams()
- .forEach(s -> declarer.declareStream(s, new
Fields(consumerRecordTransformer.getFieldNames(s))));
+ long getOffset() {
+ return offset;
}
- private void emitConsumerRecord(ConsumerRecord<K, V> record) {
- consumerRecordTransformer.transform(record)
- .forEach((s, objects) -> {
- if (topologyReliabilityMode !=
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
- collector.emit(s, objects);
- //only in effective once mode, we need to track the
offset of the record that is just emitted into the topology
- if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
- state.put(new TopicPartition(record.topic(),
record.partition()), record.offset());
- }
- } else {
- //build message id based on topic, partition, offset
of the consumer record
- ConsumerRecordMessageId consumerRecordMessageId = new
ConsumerRecordMessageId(new TopicPartition(record.topic(), record.partition()),
record.offset());
- //emit tuple with the message id
- collector.emit(s, objects, consumerRecordMessageId);
- }
- });
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ConsumerRecordMessageId that = (ConsumerRecordMessageId) o;
+
+ if (offset != that.offset) {
+ return false;
+ }
+ return topicPartition.equals(that.topicPartition);
}
- private void rewindAndDiscardAck(TopicPartition topicPartition,
NavigableMap<Long, Long> ackRanges) {
- if (failureRegistry.containsKey(topicPartition)) {
- long earliestFailedOffset = failureRegistry.get(topicPartition);
- //rewind back to the earliest failed offset
- consumer.seek(topicPartition, earliestFailedOffset);
- //discard the ack whose offset is greater than the earliest failed
offset if there is any because we've rewound the consumer back
- SortedMap<Long, Long> sortedMap =
ackRanges.headMap(earliestFailedOffset);
- if (!sortedMap.isEmpty()) {
- sortedMap.put(sortedMap.lastKey(),
Math.min(earliestFailedOffset, sortedMap.get(sortedMap.lastKey())));
- }
- ackRegistry.put(topicPartition, new
ConcurrentSkipListMap<>(sortedMap));
- //failure for this partition has been dealt with
- failureRegistry.remove(topicPartition);
- }
+ @Override
+ public int hashCode() {
+ int result = topicPartition.hashCode();
+ result = 31 * result + (int) (offset ^ (offset >>> 32));
+ return result;
}
+ }
- private void manualCommit(TopicPartition topicPartition,
NavigableMap<Long, Long> ackRanges) {
- //the first entry in the acknowledgement registry keeps track of the
lowest possible offset that can be committed
- Map.Entry<Long, Long> firstEntry = ackRanges.firstEntry();
- if (firstEntry != null) {
- consumer.commitAsync(Collections.singletonMap(topicPartition, new
OffsetAndMetadata(firstEntry.getValue() + 1)), null);
- }
- }
+ class KafkaConsumerRebalanceListener implements ConsumerRebalanceListener {
- private Iterable<ConsumerRecord<K, V>> poll() {
- ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(200));
- if (!records.isEmpty()) {
- /*
- since the Kafka Consumer metrics are built gradually based on the
partitions it consumes,
- we need to periodically check whether there's any new metrics to
register after each polling.
- */
- if (System.currentTimeMillis() -
previousKafkaMetricsUpdatedTimestamp > metricsIntervalInSecs) {
- registerConsumerMetrics();
- previousKafkaMetricsUpdatedTimestamp =
System.currentTimeMillis();
- }
- if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.ATMOST_ONCE) {
- consumer.commitAsync();
- }
- return records;
- }
- return Collections.emptyList();
- }
-
- private void registerConsumerMetrics() {
- consumer.metrics().forEach((metricName, o) -> {
- if (!reportedMetrics.contains(metricName)) {
- reportedMetrics.add(metricName);
- String exposedName = extractKafkaMetricName(metricName);
- LOG.info("register Kakfa Consumer metric {}", exposedName);
- topologyContext.registerMetric(exposedName, new
KafkaMetricDecorator<>(o), metricsIntervalInSecs);
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> collection) {
+ assignedPartitions.removeAll(collection);
+ if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+ collection.forEach(topicPartition -> {
+ NavigableMap<Long, Long> navigableMap =
ackRegistry.remove(topicPartition);
+ if (navigableMap != null) {
+ Map.Entry<Long, Long> entry = navigableMap.firstEntry();
+ if (entry != null) {
+ consumer.commitAsync(Collections.singletonMap(topicPartition,
+ new OffsetAndMetadata(Math.min(
+ failureRegistry.getOrDefault(topicPartition,
Long.MAX_VALUE),
+ entry.getValue()) + 1)), null);
}
+ }
+ failureRegistry.remove(topicPartition);
});
+ } else if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+ collection.forEach(topicPartition -> state.remove(topicPartition));
+ }
}
- private String extractKafkaMetricName(MetricName metricName) {
- StringBuilder builder = new StringBuilder()
- .append(metricName.name())
- .append('-')
- .append(metricName.group());
- metricName.tags().forEach((s, s2) -> builder.append('-')
- .append(s)
- .append('-')
- .append(s2));
- LOG.info("register Kakfa Consumer metric {}", builder);
- return builder.toString();
- }
-
- static class ConsumerRecordMessageId {
- private TopicPartition topicPartition;
- private long offset;
-
- ConsumerRecordMessageId(TopicPartition topicPartition, long offset) {
- this.topicPartition = topicPartition;
- this.offset = offset;
- }
-
- @Override
- public String toString() {
- return "ConsumerRecordMessageId{" +
- "topicPartition=" + topicPartition +
- ", offset=" + offset +
- '}';
- }
-
- TopicPartition getTopicPartition() {
- return topicPartition;
- }
-
- long getOffset() {
- return offset;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- ConsumerRecordMessageId that = (ConsumerRecordMessageId) o;
-
- if (offset != that.offset) return false;
- return topicPartition.equals(that.topicPartition);
- }
-
- @Override
- public int hashCode() {
- int result = topicPartition.hashCode();
- result = 31 * result + (int) (offset ^ (offset >>> 32));
- return result;
- }
- }
-
- class KafkaConsumerRebalanceListener implements ConsumerRebalanceListener {
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> collection)
{
- assignedPartitions.removeAll(collection);
- if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
- collection.forEach(topicPartition -> {
- NavigableMap<Long, Long> navigableMap =
ackRegistry.remove(topicPartition);
- if (navigableMap != null) {
- Map.Entry<Long, Long> entry =
navigableMap.firstEntry();
- if (entry != null) {
-
consumer.commitAsync(Collections.singletonMap(topicPartition, new
OffsetAndMetadata(Math.min(failureRegistry.getOrDefault(topicPartition,
Long.MAX_VALUE), entry.getValue()) + 1)), null);
- }
- }
- failureRegistry.remove(topicPartition);
- });
- } else if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
- collection.forEach(topicPartition ->
state.remove(topicPartition));
- }
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition>
collection) {
- assignedPartitions.addAll(collection);
- if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
- collection.forEach(topicPartition -> {
- try {
- long nextRecordPosition =
consumer.position(topicPartition, Duration.ofSeconds(5));
- ackRegistry.put(topicPartition, new
ConcurrentSkipListMap<>(Collections.singletonMap(nextRecordPosition - 1,
nextRecordPosition - 1)));
- } catch (TimeoutException e) {
- LOG.warn("can not get the position of the next record
to consume for partition {}", topicPartition);
- ackRegistry.remove(topicPartition);
- }
- failureRegistry.remove(topicPartition);
- });
- } else if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
- collection.forEach(topicPartition -> {
- if (state.containsKey(topicPartition)) {
- consumer.seek(topicPartition,
state.get(topicPartition));
- }
- });
- }
- }
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> collection) {
+ assignedPartitions.addAll(collection);
+ if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+ collection.forEach(topicPartition -> {
+ try {
+ long nextRecordPosition = consumer.position(topicPartition,
+ Duration.ofSeconds(5));
+ ackRegistry.put(topicPartition, new ConcurrentSkipListMap<>(
+ Collections.singletonMap(nextRecordPosition - 1,
nextRecordPosition - 1)
+ ));
+ } catch (TimeoutException e) {
+ LOG.warn("can not get the position of the next record to consume
for partition {}",
+ topicPartition);
+ ackRegistry.remove(topicPartition);
+ }
+ failureRegistry.remove(topicPartition);
+ });
+ } else if (topologyReliabilityMode ==
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+ collection.forEach(topicPartition -> {
+ if (state.containsKey(topicPartition)) {
+ consumer.seek(topicPartition, state.get(topicPartition));
+ }
+ });
+ }
}
+ }
}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java
index 1448609..43d6c6a 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,8 +28,10 @@ import java.util.regex.Pattern;
*/
public interface TopicPatternProvider extends Serializable {
- /**
- * @return a matching pattern for topics to subscribe to
- */
- Pattern create();
+ /**
+ * create the topic matching regex pattern
+ *
+ * @return a matching pattern for topics to subscribe to
+ */
+ Pattern create();
}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/BUILD
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/BUILD
new file mode 100644
index 0000000..1d9c400
--- /dev/null
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/BUILD
@@ -0,0 +1,44 @@
+heron_kafka_spouts_test_dep = [
+
"//contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java:heron-kafka-spout-java",
+ "//heron/api/src/java:api-java-low-level",
+ "//heron/common/src/java:basics-java",
+ "//heron/common/src/java:config-java",
+ "//third_party/java:junit4",
+ "@org_apache_kafka_kafka_clients//jar",
+ "@org_mockito_mockito_all//jar",
+]
+
+java_test(
+ name = "KafkaSpoutTest",
+ srcs = ["org/apache/heron/spouts/kafka/KafkaSpoutTest.java"],
+ test_class = "org.apache.heron.spouts.kafka.KafkaSpoutTest",
+ deps = heron_kafka_spouts_test_dep,
+)
+
+java_test(
+ name = "KafkaMetricDecoratorTest",
+ srcs = ["org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java"],
+ test_class = "org.apache.heron.spouts.kafka.KafkaMetricDecoratorTest",
+ deps = heron_kafka_spouts_test_dep,
+)
+
+java_test(
+ name = "DefaultTopicPatternProviderTest",
+ srcs =
["org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java"],
+ test_class =
"org.apache.heron.spouts.kafka.DefaultTopicPatternProviderTest",
+ deps = heron_kafka_spouts_test_dep,
+)
+
+java_test(
+ name = "DefaultKafkaConsumerFactoryTest",
+ srcs =
["org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java"],
+ test_class =
"org.apache.heron.spouts.kafka.DefaultKafkaConsumerFactoryTest",
+ deps = heron_kafka_spouts_test_dep,
+)
+
+java_test(
+ name = "DefaultConsumerRecordTransformerTest",
+ srcs =
["org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java"],
+ test_class =
"org.apache.heron.spouts.kafka.DefaultConsumerRecordTransformerTest",
+ deps = heron_kafka_spouts_test_dep,
+)
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java
index c9d676e..69158b6 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,40 +18,45 @@
package org.apache.heron.spouts.kafka;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-class DefaultConsumerRecordTransformerTest {
- private static final String DEFAULT_STREAM = "default";
- private ConsumerRecordTransformer<String, byte[]>
consumerRecordTransformer;
-
- @BeforeEach
- void setUp() {
- consumerRecordTransformer = new DefaultConsumerRecordTransformer<>();
- }
-
- @Test
- void getOutputStreams() {
- assertEquals(Collections.singletonList(DEFAULT_STREAM),
consumerRecordTransformer.getOutputStreams());
- }
-
- @Test
- void getFieldNames() {
- assertEquals(Arrays.asList("key", "value"),
consumerRecordTransformer.getFieldNames(DEFAULT_STREAM));
- }
-
- @Test
- void transform() {
- ConsumerRecord<String, byte[]> consumerRecord = new
ConsumerRecord<>("partition", 0, 0, "key", new byte[]{0x1, 0x2, 0x3});
- Map<String, List<Object>> expected =
Collections.singletonMap(DEFAULT_STREAM, Arrays.asList(consumerRecord.key(),
consumerRecord.value()));
- assertEquals(expected,
consumerRecordTransformer.transform(consumerRecord));
- }
-}
\ No newline at end of file
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import static org.junit.Assert.assertEquals;
+
+public class DefaultConsumerRecordTransformerTest {
+ private static final String DEFAULT_STREAM = "default";
+ private ConsumerRecordTransformer<String, byte[]> consumerRecordTransformer;
+
+ @Before
+ public void setUp() {
+ consumerRecordTransformer = new DefaultConsumerRecordTransformer<>();
+ }
+
+ @Test
+ public void getOutputStreams() {
+ assertEquals(Collections.singletonList(DEFAULT_STREAM),
+ consumerRecordTransformer.getOutputStreams());
+ }
+
+ @Test
+ public void getFieldNames() {
+ assertEquals(Arrays.asList("key", "value"),
+ consumerRecordTransformer.getFieldNames(DEFAULT_STREAM));
+ }
+
+ @Test
+ public void transform() {
+ ConsumerRecord<String, byte[]> consumerRecord = new
ConsumerRecord<>("partition", 0,
+ 0, "key", new byte[]{0x1, 0x2, 0x3});
+ Map<String, List<Object>> expected =
Collections.singletonMap(DEFAULT_STREAM,
+ Arrays.asList(consumerRecord.key(), consumerRecord.value()));
+ assertEquals(expected,
consumerRecordTransformer.transform(consumerRecord));
+ }
+}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java
index 205053c..1a6a187 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,33 +18,35 @@
package org.apache.heron.spouts.kafka;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
import java.util.HashMap;
import java.util.Map;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
-class DefaultKafkaConsumerFactoryTest {
- private KafkaConsumerFactory<String, byte[]> kafkaConsumerFactory;
+import static org.junit.Assert.assertTrue;
- @BeforeEach
- void setUp() {
- Map<String, Object> config = new HashMap<>();
- config.put("bootstrap.servers", "localhost:9092");
- config.put("group.id", "tower-kafka-spout");
- config.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
- config.put("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
- kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(config);
- }
+public class DefaultKafkaConsumerFactoryTest {
+ private KafkaConsumerFactory<String, byte[]> kafkaConsumerFactory;
+
+ @Before
+ public void setUp() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("bootstrap.servers", "localhost:9092");
+ config.put("group.id", "tower-kafka-spout");
+ config.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ config.put("value.deserializer",
+ "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(config);
+ }
- @Test
- void create() {
- try (Consumer<String, byte[]> consumer =
kafkaConsumerFactory.create()) {
- assertTrue(consumer instanceof KafkaConsumer);
- }
+ @Test
+ public void create() {
+ try (Consumer<String, byte[]> consumer = kafkaConsumerFactory.create()) {
+ assertTrue(consumer instanceof KafkaConsumer);
}
-}
\ No newline at end of file
+ }
+}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java
index 21449e4..118426f 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,16 +18,17 @@
package org.apache.heron.spouts.kafka;
-import org.junit.jupiter.api.Test;
-
import java.util.regex.Pattern;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
-class DefaultTopicPatternProviderTest {
+public class DefaultTopicPatternProviderTest {
- @Test
- void create() {
- assertEquals(Pattern.compile("a").pattern(), new
DefaultTopicPatternProvider("a").create().pattern());
- }
-}
\ No newline at end of file
+ @Test
+ public void create() {
+ assertEquals(Pattern.compile("a").pattern(),
+ new DefaultTopicPatternProvider("a").create().pattern());
+ }
+}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java
index d1bb516..35bd956 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,28 +18,29 @@
package org.apache.heron.spouts.kafka;
-import org.apache.kafka.common.Metric;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import org.apache.kafka.common.Metric;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
-@ExtendWith(MockitoExtension.class)
-class KafkaMetricDecoratorTest {
- @Mock
- private Metric metric;
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaMetricDecoratorTest {
+ @Mock
+ private Metric metric;
- @BeforeEach
- void setUp() {
- when(metric.metricValue()).thenReturn("dummy value");
- }
+ @Before
+ public void setUp() {
+ when(metric.metricValue()).thenReturn("dummy value");
+ }
- @Test
- void getValueAndReset() {
- assertEquals("dummy value", new
KafkaMetricDecorator<>(metric).getValueAndReset());
- }
-}
\ No newline at end of file
+ @Test
+ public void getValueAndReset() {
+ assertEquals("dummy value", new
KafkaMetricDecorator<>(metric).getValueAndReset());
+ }
+}
diff --git
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
index 3fa026e..dce386a 100644
---
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
+++
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,247 +18,299 @@
package org.apache.heron.spouts.kafka;
-import com.twitter.heron.api.Config;
-import com.twitter.heron.api.metric.IMetric;
-import com.twitter.heron.api.spout.SpoutOutputCollector;
-import com.twitter.heron.api.topology.OutputFieldsDeclarer;
-import com.twitter.heron.api.topology.TopologyContext;
-import com.twitter.heron.api.tuple.Fields;
-import com.twitter.heron.common.basics.SingletonRegistry;
-import com.twitter.heron.common.config.SystemConfig;
-import com.twitter.heron.common.config.SystemConfigKey;
-import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.regex.Pattern;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.runners.MockitoJUnitRunner;
-import java.time.Duration;
-import java.util.*;
-import java.util.regex.Pattern;
+import org.apache.heron.api.Config;
+import org.apache.heron.api.metric.IMetric;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.common.basics.SingletonRegistry;
+import org.apache.heron.common.config.SystemConfig;
+import org.apache.heron.common.config.SystemConfigKey;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
-import static
com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE;
-import static com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
-import static org.junit.jupiter.api.Assertions.*;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.*;
+import static org.apache.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE;
+import static org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
-@ExtendWith(MockitoExtension.class)
-class KafkaSpoutTest {
- private static final Random random = new Random();
- private static final String DUMMY_TOPIC_NAME = "topic";
- private KafkaSpout<String, byte[]> kafkaSpout;
- @Mock
- private KafkaConsumerFactory<String, byte[]> kafkaConsumerFactory;
- @Mock
- private Consumer<String, byte[]> consumer;
- @Mock
- private TopologyContext topologyContext;
- @Mock
- private SpoutOutputCollector collector;
- @Mock
- private Metric metric;
- @Captor
- private ArgumentCaptor<Pattern> patternArgumentCaptor;
- @Captor
- private ArgumentCaptor<IMetric<Object>> kafkaMetricDecoratorArgumentCaptor;
- @Mock
- private OutputFieldsDeclarer declarer;
- @Captor
- private ArgumentCaptor<Fields> fieldsArgumentCaptor;
- @Captor
- private ArgumentCaptor<List<Object>> listArgumentCaptor;
- @Captor
- private ArgumentCaptor<ConsumerRebalanceListener>
consumerRebalanceListenerArgumentCaptor;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
- @BeforeAll
- static void setUpAll() {
- if
(!SingletonRegistry.INSTANCE.containsSingleton(SystemConfig.HERON_SYSTEM_CONFIG))
{
-
SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG,
SystemConfig.newBuilder(true)
- .put(SystemConfigKey.HERON_METRICS_EXPORT_INTERVAL, 60)
- .build());
- }
- }
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
- @BeforeEach
- void setUp() {
- kafkaSpout = new KafkaSpout<>(kafkaConsumerFactory,
Collections.singleton(DUMMY_TOPIC_NAME));
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSpoutTest {
+ private static final Random RANDOM = new Random();
+ private static final String DUMMY_TOPIC_NAME = "topic";
+ private KafkaSpout<String, byte[]> kafkaSpout;
+ @Mock
+ private KafkaConsumerFactory<String, byte[]> kafkaConsumerFactory;
+ @Mock
+ private Consumer<String, byte[]> consumer;
+ @Mock
+ private TopologyContext topologyContext;
+ @Mock
+ private SpoutOutputCollector collector;
+ @Mock
+ private Metric metric;
+ @Captor
+ private ArgumentCaptor<Pattern> patternArgumentCaptor;
+ @Captor
+ private ArgumentCaptor<IMetric<Object>> kafkaMetricDecoratorArgumentCaptor;
+ @Mock
+ private OutputFieldsDeclarer declarer;
+ @Captor
+ private ArgumentCaptor<Fields> fieldsArgumentCaptor;
+ @Captor
+ private ArgumentCaptor<List<Object>> listArgumentCaptor;
+ @Captor
+ private ArgumentCaptor<ConsumerRebalanceListener>
consumerRebalanceListenerArgumentCaptor;
+
+ @BeforeClass
+ public static void setUpAll() {
+ if
(!SingletonRegistry.INSTANCE.containsSingleton(SystemConfig.HERON_SYSTEM_CONFIG))
{
+
SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG,
+ SystemConfig.newBuilder(true)
+ .put(SystemConfigKey.HERON_METRICS_EXPORT_INTERVAL, 60)
+ .build());
}
+ }
- @Test
- void getConsumerRecordTransformer() {
- assertTrue(kafkaSpout.getConsumerRecordTransformer() instanceof
DefaultConsumerRecordTransformer);
+ @Before
+ public void setUp() {
+ kafkaSpout = new KafkaSpout<>(kafkaConsumerFactory,
+ Collections.singleton(DUMMY_TOPIC_NAME));
+ }
- }
+ @Test
+ public void getConsumerRecordTransformer() {
+ assertTrue(kafkaSpout.getConsumerRecordTransformer()
+ instanceof DefaultConsumerRecordTransformer);
- @Test
- void setConsumerRecordTransformer() {
- ConsumerRecordTransformer<String, byte[]> consumerRecordTransformer =
new DefaultConsumerRecordTransformer<>();
- kafkaSpout.setConsumerRecordTransformer(consumerRecordTransformer);
- assertEquals(consumerRecordTransformer,
kafkaSpout.getConsumerRecordTransformer());
- }
+ }
- @Test
- void open() {
- when(kafkaConsumerFactory.create()).thenReturn(consumer);
+ @Test
+ public void setConsumerRecordTransformer() {
+ ConsumerRecordTransformer<String, byte[]> consumerRecordTransformer =
+ new DefaultConsumerRecordTransformer<>();
+ kafkaSpout.setConsumerRecordTransformer(consumerRecordTransformer);
+ assertEquals(consumerRecordTransformer,
kafkaSpout.getConsumerRecordTransformer());
+ }
-
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATMOST_ONCE.name()), topologyContext, collector);
-
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
any(KafkaSpout.KafkaConsumerRebalanceListener.class));
+ @Test
+ public void open() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
- kafkaSpout = new KafkaSpout<>(kafkaConsumerFactory, new
DefaultTopicPatternProvider("a"));
-
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATMOST_ONCE.name()), topologyContext, collector);
- verify(consumer).subscribe(patternArgumentCaptor.capture(),
any(KafkaSpout.KafkaConsumerRebalanceListener.class));
- assertEquals("a", patternArgumentCaptor.getValue().pattern());
- }
+ kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+ ATMOST_ONCE.name()), topologyContext, collector);
+ verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
+ any(KafkaSpout.KafkaConsumerRebalanceListener.class));
- @Test
- void nextTuple() {
- when(kafkaConsumerFactory.create()).thenReturn(consumer);
- ConsumerRecords<String, byte[]> consumerRecords = new
ConsumerRecords<>(Collections.singletonMap(new TopicPartition(DUMMY_TOPIC_NAME,
0), Collections.singletonList(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, 0,
"key", new byte[]{0xF}))));
- when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
- doReturn(Collections.singletonMap(new MetricName("name", "group",
"description", Collections.singletonMap("name", "value")),
metric)).when(consumer).metrics();
- when(metric.metricValue()).thenReturn("sample value");
+ kafkaSpout = new KafkaSpout<>(kafkaConsumerFactory,
+ new DefaultTopicPatternProvider("a"));
+ kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+ ATMOST_ONCE.name()), topologyContext, collector);
+ verify(consumer).subscribe(patternArgumentCaptor.capture(),
+ any(KafkaSpout.KafkaConsumerRebalanceListener.class));
+ assertEquals("a", patternArgumentCaptor.getValue().pattern());
+ }
-
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATMOST_ONCE.name()), topologyContext, collector);
-
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
consumerRebalanceListenerArgumentCaptor.capture());
- ConsumerRebalanceListener consumerRebalanceListener =
consumerRebalanceListenerArgumentCaptor.getValue();
- TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME,
0);
-
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+ @Test
+ public void nextTuple() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
+ ConsumerRecords<String, byte[]> consumerRecords = new ConsumerRecords<>(
+ Collections.singletonMap(new TopicPartition(DUMMY_TOPIC_NAME, 0),
+ Collections.singletonList(new ConsumerRecord<>(DUMMY_TOPIC_NAME,
0, 0,
+ "key", new byte[]{0xF}))));
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+ doReturn(Collections.singletonMap(new MetricName("name", "group",
"description",
+ Collections.singletonMap("name", "value")),
metric)).when(consumer).metrics();
+ when(metric.metricValue()).thenReturn("sample value");
- kafkaSpout.nextTuple();
- verify(consumer).commitAsync();
- verify(topologyContext).registerMetric(eq("name-group-name-value"),
kafkaMetricDecoratorArgumentCaptor.capture(), eq(60));
- assertEquals("sample value",
kafkaMetricDecoratorArgumentCaptor.getValue().getValueAndReset());
+ kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+ ATMOST_ONCE.name()), topologyContext, collector);
+ verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
+ consumerRebalanceListenerArgumentCaptor.capture());
+ ConsumerRebalanceListener consumerRebalanceListener =
+ consumerRebalanceListenerArgumentCaptor.getValue();
+ TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 0);
+
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
- kafkaSpout.nextTuple();
- verify(collector).emit(eq("default"), listArgumentCaptor.capture());
- assertEquals("key", listArgumentCaptor.getValue().get(0));
- assertArrayEquals(new byte[]{0xF}, (byte[])
listArgumentCaptor.getValue().get(1));
- }
+ kafkaSpout.nextTuple();
+ verify(consumer).commitAsync();
+ verify(topologyContext).registerMetric(eq("name-group-name-value"),
+ kafkaMetricDecoratorArgumentCaptor.capture(), eq(60));
+ assertEquals("sample value",
+ kafkaMetricDecoratorArgumentCaptor.getValue().getValueAndReset());
- @Test
- void ack() {
- when(kafkaConsumerFactory.create()).thenReturn(consumer);
- TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME,
0);
- List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>();
- byte[] randomBytes = new byte[1];
- for (int i = 0; i < 5; i++) {
- random.nextBytes(randomBytes);
- recordList.add(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, i, "key",
Arrays.copyOf(randomBytes, randomBytes.length)));
- }
- ConsumerRecords<String, byte[]> consumerRecords = new
ConsumerRecords<>(Collections.singletonMap(topicPartition, recordList));
- when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+ kafkaSpout.nextTuple();
+ verify(collector).emit(eq("default"), listArgumentCaptor.capture());
+ assertEquals("key", listArgumentCaptor.getValue().get(0));
+ assertArrayEquals(new byte[]{0xF}, (byte[])
listArgumentCaptor.getValue().get(1));
+ }
-
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATLEAST_ONCE.name()), topologyContext, collector);
- //poll the topic
- kafkaSpout.nextTuple();
- //emit all of the five records
- for (int i = 0; i < 5; i++) {
- kafkaSpout.nextTuple();
- }
- //ack came in out of order and the third record is not acknowledged
- kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
4));
- kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
0));
- kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
1));
- kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
3));
- //commit and poll
- kafkaSpout.nextTuple();
- verify(consumer).commitAsync(Collections.singletonMap(topicPartition,
new OffsetAndMetadata(2)), null);
+ @Test
+ public void ack() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
+ TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 0);
+ List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>();
+ byte[] randomBytes = new byte[1];
+ for (int i = 0; i < 5; i++) {
+ RANDOM.nextBytes(randomBytes);
+ recordList.add(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, i, "key",
+ Arrays.copyOf(randomBytes, randomBytes.length)));
}
+ ConsumerRecords<String, byte[]> consumerRecords = new ConsumerRecords<>(
+ Collections.singletonMap(topicPartition, recordList));
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
- @Test
- void fail() {
- when(kafkaConsumerFactory.create()).thenReturn(consumer);
- TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME,
0);
- List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>();
- byte[] randomBytes = new byte[1];
- for (int i = 0; i < 5; i++) {
- random.nextBytes(randomBytes);
- recordList.add(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, i, "key",
Arrays.copyOf(randomBytes, randomBytes.length)));
- }
- ConsumerRecords<String, byte[]> consumerRecords = new
ConsumerRecords<>(Collections.singletonMap(topicPartition, recordList));
- when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
-
-
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATLEAST_ONCE.name()), topologyContext, collector);
- //poll the topic
- kafkaSpout.nextTuple();
- //emit all of the five records
- for (int i = 0; i < 5; i++) {
- kafkaSpout.nextTuple();
- }
- //ack came in out of order, second and third record fails
- kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
4));
- kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
0));
- kafkaSpout.fail(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
1));
- kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
3));
- kafkaSpout.fail(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
2));
- //commit and poll
- kafkaSpout.nextTuple();
- verify(consumer).seek(topicPartition, 1);
- verify(consumer).commitAsync(Collections.singletonMap(topicPartition,
new OffsetAndMetadata(1)), null);
+ kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+ ATLEAST_ONCE.name()), topologyContext, collector);
+ //poll the topic
+ kafkaSpout.nextTuple();
+ //emit all of the five records
+ for (int i = 0; i < 5; i++) {
+ kafkaSpout.nextTuple();
}
+ //ack came in out of order and the third record is not acknowledged
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 4));
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 0));
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 1));
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 3));
+ //commit and poll
+ kafkaSpout.nextTuple();
+ verify(consumer).commitAsync(Collections.singletonMap(topicPartition,
+ new OffsetAndMetadata(2)), null);
+ }
- @Test
- void close() {
- when(kafkaConsumerFactory.create()).thenReturn(consumer);
-
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATMOST_ONCE.name()), topologyContext, collector);
- kafkaSpout.close();
- verify(consumer).close();
+ @Test
+ public void fail() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
+ TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 0);
+ List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>();
+ byte[] randomBytes = new byte[1];
+ for (int i = 0; i < 5; i++) {
+ RANDOM.nextBytes(randomBytes);
+ recordList.add(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, i, "key",
+ Arrays.copyOf(randomBytes, randomBytes.length)));
}
+ ConsumerRecords<String, byte[]> consumerRecords = new ConsumerRecords<>(
+ Collections.singletonMap(topicPartition, recordList));
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
- @Test
- void declareOutputFields() {
- kafkaSpout.declareOutputFields(declarer);
- verify(declarer).declareStream(eq("default"),
fieldsArgumentCaptor.capture());
- assertEquals(Arrays.asList("key", "value"),
fieldsArgumentCaptor.getValue().toList());
+ kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+ ATLEAST_ONCE.name()), topologyContext, collector);
+ //poll the topic
+ kafkaSpout.nextTuple();
+ //emit all of the five records
+ for (int i = 0; i < 5; i++) {
+ kafkaSpout.nextTuple();
}
+ //ack came in out of order, second and third record fails
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 4));
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 0));
+ kafkaSpout.fail(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 1));
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 3));
+ kafkaSpout.fail(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 2));
+ //commit and poll
+ kafkaSpout.nextTuple();
+ verify(consumer).seek(topicPartition, 1);
+ verify(consumer).commitAsync(Collections.singletonMap(topicPartition,
+ new OffsetAndMetadata(1)), null);
+ }
- @Test
- void consumerRebalanceListener() {
- when(kafkaConsumerFactory.create()).thenReturn(consumer);
+ @Test
+ public void close() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
+ kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+ ATMOST_ONCE.name()), topologyContext, collector);
+ kafkaSpout.close();
+ verify(consumer).close();
+ }
-
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATLEAST_ONCE.name()), topologyContext, collector);
-
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
consumerRebalanceListenerArgumentCaptor.capture());
- ConsumerRebalanceListener consumerRebalanceListener =
consumerRebalanceListenerArgumentCaptor.getValue();
- TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME,
0);
-
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
- verify(consumer).position(topicPartition, Duration.ofSeconds(5));
+ @Test
+ public void declareOutputFields() {
+ kafkaSpout.declareOutputFields(declarer);
+ verify(declarer).declareStream(eq("default"),
fieldsArgumentCaptor.capture());
+ assertEquals(Arrays.asList("key", "value"),
fieldsArgumentCaptor.getValue().toList());
+ }
- kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
0));
- kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition,
1));
-
consumerRebalanceListener.onPartitionsRevoked(Collections.singleton(topicPartition));
- verify(consumer).commitAsync(Collections.singletonMap(topicPartition,
new OffsetAndMetadata(2)), null);
- }
+ @Test
+ public void consumerRebalanceListener() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
- @Test
- void activate() {
- when(kafkaConsumerFactory.create()).thenReturn(consumer);
-
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATMOST_ONCE.name()), topologyContext, collector);
-
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
consumerRebalanceListenerArgumentCaptor.capture());
- ConsumerRebalanceListener consumerRebalanceListener =
consumerRebalanceListenerArgumentCaptor.getValue();
- TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME,
0);
-
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
- kafkaSpout.activate();
- verify(consumer).resume(Collections.singleton(topicPartition));
- }
+ kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+ ATLEAST_ONCE.name()), topologyContext, collector);
+ verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
+ consumerRebalanceListenerArgumentCaptor.capture());
+ ConsumerRebalanceListener consumerRebalanceListener =
+ consumerRebalanceListenerArgumentCaptor.getValue();
+ TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 0);
+
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+ verify(consumer).position(topicPartition, Duration.ofSeconds(5));
- @Test
- void deactivate() {
- when(kafkaConsumerFactory.create()).thenReturn(consumer);
-
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
ATMOST_ONCE.name()), topologyContext, collector);
-
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
consumerRebalanceListenerArgumentCaptor.capture());
- ConsumerRebalanceListener consumerRebalanceListener =
consumerRebalanceListenerArgumentCaptor.getValue();
- TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME,
0);
-
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
- kafkaSpout.deactivate();
- verify(consumer).pause(Collections.singleton(topicPartition));
- }
-}
\ No newline at end of file
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 0));
+ kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 1));
+
consumerRebalanceListener.onPartitionsRevoked(Collections.singleton(topicPartition));
+ verify(consumer).commitAsync(Collections.singletonMap(topicPartition,
+ new OffsetAndMetadata(2)), null);
+ }
+
+ @Test
+ public void activate() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
+ kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+ ATMOST_ONCE.name()), topologyContext, collector);
+ verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
+ consumerRebalanceListenerArgumentCaptor.capture());
+ ConsumerRebalanceListener consumerRebalanceListener =
+ consumerRebalanceListenerArgumentCaptor.getValue();
+ TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 0);
+
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+ kafkaSpout.activate();
+ verify(consumer).resume(Collections.singleton(topicPartition));
+ }
+
+ @Test
+ public void deactivate() {
+ when(kafkaConsumerFactory.create()).thenReturn(consumer);
+ kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+ ATMOST_ONCE.name()), topologyContext, collector);
+ verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
+ consumerRebalanceListenerArgumentCaptor.capture());
+ ConsumerRebalanceListener consumerRebalanceListener =
+ consumerRebalanceListenerArgumentCaptor.getValue();
+ TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 0);
+
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+ kafkaSpout.deactivate();
+ verify(consumer).pause(Collections.singleton(topicPartition));
+ }
+}
diff --git a/contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml
deleted file mode 100644
index bd1b5e97..0000000
--- a/contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml
+++ /dev/null
@@ -1,76 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Copyright 2019
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://maven.apache.org/POM/4.0.0"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>org.apache.heron</groupId>
- <artifactId>heron-kafka-spout-parent</artifactId>
- <version>1.0-SNAPSHOT</version>
- <packaging>pom</packaging>
-
- <properties>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <modules>
- <module>heron-kafka-spout</module>
- <module>heron-kafka-spout-sample</module>
- </modules>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>com.twitter.heron</groupId>
- <artifactId>heron-api</artifactId>
- <version>0.17.8</version>
- </dependency>
- <dependency>
- <groupId>com.twitter.heron</groupId>
- <artifactId>heron-storm</artifactId>
- <version>0.17.8</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.1.1</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- <version>1.7.26</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
- <build>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.22.1</version>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
-
-</project>
\ No newline at end of file
diff --git
a/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java
b/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java
index 72ea2a8..3e9d73f 100644
---
a/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java
+++
b/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java
@@ -307,7 +307,8 @@ public class HeronStreamBuilderTest {
verify(mockContext).getTopologyDefinition();
verify(mockContext).getBolt(eq(to));
verify(mockDefinition).parallelismForBolt(eq(to));
- verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIStatefulWindowedBolt),
eq(iRichBoltParallelism));
+ verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIStatefulWindowedBolt),
+ eq(iRichBoltParallelism));
verify(mockBoltDeclarer).customGrouping(eq(from), eq(streamId),
eq(mockCustomStreamGrouping));
verify(mockContext).setStreams(anyMap());
verify(mockDefinition).getStreams();
diff --git a/scripts/get_all_heron_paths.sh b/scripts/get_all_heron_paths.sh
index 0bae1ed..42e9281 100755
--- a/scripts/get_all_heron_paths.sh
+++ b/scripts/get_all_heron_paths.sh
@@ -67,7 +67,7 @@ function get_package_of() {
}
function get_heron_java_paths() {
- local java_paths=$(find
{heron,heron/tools,tools,integration_test,storm-compatibility,eco,eco-storm-examples,eco-heron-examples}
-name "*.java" | sed "s|/src/java/.*$|/src/java|"| sed
"s|/java/src/.*$|/java/src|" | sed "s|/tests/java/.*$|/tests/java|" | sort -u
| fgrep -v "heron/scheduler/" | fgrep -v "heron/scheduler/" )
+ local java_paths=$(find
{heron,heron/tools,tools,integration_test,storm-compatibility,eco,eco-storm-examples,eco-heron-examples,contrib}
-name "*.java" | sed "s|/src/java/.*$|/src/java|"| sed
"s|/java/src/.*$|/java/src|" | sed "s|/tests/java/.*$|/tests/java|" | sort -u
| fgrep -v "heron/scheduler/" | fgrep -v "heron/scheduler/" )
if [ "$(uname -s | tr 'A-Z' 'a-z')" != "darwin" ]; then
java_paths=$(echo "${java_paths}" | fgrep -v "/objc_tools/")
fi
diff --git a/scripts/travis/build.sh b/scripts/travis/build.sh
index d767d3a..a02eddf 100755
--- a/scripts/travis/build.sh
+++ b/scripts/travis/build.sh
@@ -82,7 +82,7 @@ start_timer "$T"
python ${UTILS}/save-logs.py "heron_build.txt" bazel\
--bazelrc=tools/travis/bazel.rc build --config=$PLATFORM heron/... \
heronpy/... examples/... storm-compatibility-examples/... \
- eco-storm-examples/... eco-heron-examples/...
+ eco-storm-examples/... eco-heron-examples/... contrib/...
end_timer "$T"
# run heron unit tests
@@ -93,7 +93,7 @@ python ${UTILS}/save-logs.py "heron_test_non_flaky.txt" bazel\
--test_summary=detailed --test_output=errors\
--config=$PLATFORM --test_tag_filters=-flaky heron/... \
heronpy/... examples/... storm-compatibility-examples/... \
- eco-storm-examples/... eco-heron-examples/...
+ eco-storm-examples/... eco-heron-examples/... contrib/...
end_timer "$T"
# flaky tests are often due to test port race conditions,