Preet Puri created STORM-2675:
---------------------------------
Summary: KafkaTridentSpoutOpaque not committing offsets to Kafka
Key: STORM-2675
URL: https://issues.apache.org/jira/browse/STORM-2675
Project: Apache Storm
Issue Type: Bug
Components: storm-kafka-client
Affects Versions: 1.1.0
Reporter: Preet Puri
Every time I restart the topology the spout was picking the earliest message
even though poll strategy is set UNCOMMITTED_EARLIEST. I looked at Kafka's
__consumer_offsets topic to see it spout (Consumer is committing the offsets
but did not find any commits). No even able to locate the code in the
KafkaTridentSpoutEmitter class where we are updating the commits?
onf.put(Config.TOPOLOGY_DEBUG, true);
conf.put(Config.TOPOLOGY_WORKERS, 1);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new
String[]{"localhost"}));
conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
protected static KafkaSpoutConfig<String, String> getPMStatKafkaSpoutConfig() {
ByTopicRecordTranslator<String, String> byTopic =
new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(),
r.value()),
new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM);
return new KafkaSpoutConfig.Builder<String, String>(Utils.getBrokerHosts(),
StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName())
.setMaxPartitionFectchBytes(10 * 1024) // 10 KB
.setRetry(getRetryService())
.setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
.setMaxUncommittedOffsets(250)
.setProp("value.deserializer",
"io.confluent.kafka.serializers.KafkaAvroDeserializer")
.setProp("schema.registry.url","http://localhost:8081")
.setProp("specific.avro.reader",true)
.setGroupId(AGGREGATION_CONSUMER_GROUP)
.setRecordTranslator(byTopic).build();
}
Stream pmStatStream =
topology.newStream("statStream", new
KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)
storm-version - 1.1.0
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)