Robert Joseph Evans created STORM-2225:
------------------------------------------
Summary: Kafka New API make simple things simple
Key: STORM-2225
URL: https://issues.apache.org/jira/browse/STORM-2225
Project: Apache Storm
Issue Type: Improvement
Components: storm-kafka-client
Affects Versions: 1.0.0, 2.0.0
Reporter: Robert Joseph Evans
Assignee: Robert Joseph Evans
The Kafka spouts in storm-kafka-client use the new API and are very extendable,
but doing very simple things take way too many lines of code.
For example to create a KafkaTridentSpoutOpaque you need the following code
(from the example).
{code}
private KafkaTridentSpoutOpaque<String, String>
newKafkaTridentSpoutOpaque() {
return new KafkaTridentSpoutOpaque<>(new KafkaTridentSpoutManager<>(
newKafkaSpoutConfig(
newKafkaSpoutStreams())));
}
private KafkaSpoutConfig<String,String>
newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(),
kafkaSpoutStreams, newTuplesBuilder(), newRetryService())
.setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(EARLIEST)
.setMaxUncommittedOffsets(250)
.build();
}
protected Map<String,Object> newKafkaConsumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS,
"127.0.0.1:9092");
props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.partition.fetch.bytes", 200);
return props;
}
protected KafkaSpoutTuplesBuilder<String, String> newTuplesBuilder() {
return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
new TopicsTupleBuilder<String, String>(TOPIC_1, TOPIC_2))
.build();
}
protected KafkaSpoutRetryService newRetryService() {
return new KafkaSpoutRetryExponentialBackoff(new
KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS),
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
Integer.MAX_VALUE,
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
}
protected KafkaSpoutStreams newKafkaSpoutStreams() {
return new KafkaSpoutStreamsNamedTopics.Builder(new Fields("str"), new
String[]{"test-trident","test-trident-1"}).build();
}
protected static class TopicsTupleBuilder<K, V> extends
KafkaSpoutTupleBuilder<K,V> {
public TopicsTupleBuilder(String... topics) {
super(topics);
}
@Override
public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
return new Values(consumerRecord.value());
}
}
{code}
All of this so I can have a trident spout that reads <String, String> values
from "localhost:9092" on the topics "test-trident" and "test-trident-1" and
outputting the value as the field "str".
I shouldn't need 50 lines of code for something I can explain in 3 lines of
test. It feels like we need to have some better defaults, and less overhead on
a lot of these things.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)