[ 
https://issues.apache.org/jira/browse/STORM-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16119147#comment-16119147
 ] 

Stig Rohde Døssing commented on STORM-2675:
-------------------------------------------

[~Preet] Opened a PR here https://github.com/apache/storm/pull/2271. I'd 
appreciate it if you would check if it works for you.

> KafkaTridentSpoutOpaque not committing offsets to Kafka
> -------------------------------------------------------
>
>                 Key: STORM-2675
>                 URL: https://issues.apache.org/jira/browse/STORM-2675
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.0
>            Reporter: Preet Puri
>            Assignee: Stig Rohde Døssing
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Every time I restart the topology the spout was picking the earliest message 
> even though poll strategy is set UNCOMMITTED_EARLIEST.  I looked at Kafka's  
> __consumer_offsets topic to see if spout (consumer) is committing the offsets 
> but did not find any commits. I am not even able to locate the code in the 
> KafkaTridentSpoutEmitter class where we are updating the commits?
>     conf.put(Config.TOPOLOGY_DEBUG, true);
>     conf.put(Config.TOPOLOGY_WORKERS, 1);
>     conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new 
> String[]{"localhost"}));
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
>  protected static KafkaSpoutConfig<String, String> 
> getPMStatKafkaSpoutConfig() {
>     ByTopicRecordTranslator<String, String> byTopic =
>         new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(), 
> r.value()),
>             new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM);
>     return new KafkaSpoutConfig.Builder<String, 
> String>(Utils.getBrokerHosts(),
>         StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName())
>             .setMaxPartitionFectchBytes(10 * 1024) // 10 KB
>             .setRetry(getRetryService())
>             .setOffsetCommitPeriodMs(10_000)
>             
> .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>             .setMaxUncommittedOffsets(250)
>             .setProp("value.deserializer", 
> "io.confluent.kafka.serializers.KafkaAvroDeserializer")
>             .setProp("schema.registry.url","http://localhost:8081";)
>             .setProp("specific.avro.reader",true)
>             .setGroupId(AGGREGATION_CONSUMER_GROUP)
>             .setRecordTranslator(byTopic).build();
>   }
> Stream pmStatStream =
>         topology.newStream("statStream", new 
> KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)
> storm-version - 1.1.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to