[
https://issues.apache.org/jira/browse/STORM-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jungtaek Lim resolved STORM-2675.
---------------------------------
Resolution: Fixed
Fix Version/s: 1.2.0
Also merged into 1.x branch.
> KafkaTridentSpoutOpaque not committing offsets to Kafka
> -------------------------------------------------------
>
> Key: STORM-2675
> URL: https://issues.apache.org/jira/browse/STORM-2675
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 1.1.0
> Reporter: Preet Puri
> Assignee: Stig Rohde Døssing
> Labels: pull-request-available
> Fix For: 2.0.0, 1.2.0
>
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Every time I restart the topology the spout was picking the earliest message
> even though poll strategy is set UNCOMMITTED_EARLIEST. I looked at Kafka's
> __consumer_offsets topic to see if spout (consumer) is committing the offsets
> but did not find any commits. I am not even able to locate the code in the
> KafkaTridentSpoutEmitter class where we are updating the commits?
> conf.put(Config.TOPOLOGY_DEBUG, true);
> conf.put(Config.TOPOLOGY_WORKERS, 1);
> conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
> conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
> conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new
> String[]{"localhost"}));
> conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
> protected static KafkaSpoutConfig<String, String>
> getPMStatKafkaSpoutConfig() {
> ByTopicRecordTranslator<String, String> byTopic =
> new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(),
> r.value()),
> new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM);
> return new KafkaSpoutConfig.Builder<String,
> String>(Utils.getBrokerHosts(),
> StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName())
> .setMaxPartitionFectchBytes(10 * 1024) // 10 KB
> .setRetry(getRetryService())
> .setOffsetCommitPeriodMs(10_000)
>
> .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
> .setMaxUncommittedOffsets(250)
> .setProp("value.deserializer",
> "io.confluent.kafka.serializers.KafkaAvroDeserializer")
> .setProp("schema.registry.url","http://localhost:8081")
> .setProp("specific.avro.reader",true)
> .setGroupId(AGGREGATION_CONSUMER_GROUP)
> .setRecordTranslator(byTopic).build();
> }
> Stream pmStatStream =
> topology.newStream("statStream", new
> KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)
> storm-version - 1.1.0
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)