[ 
https://issues.apache.org/jira/browse/STORM-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Preet Puri updated STORM-2675:
------------------------------
    Description: 
Every time I restart the topology the spout was picking the earliest message 
even though poll strategy is set UNCOMMITTED_EARLIEST.  I looked at Kafka's  
__consumer_offsets topic to see if spout (consumer) is committing the offsets 
but did not find any commits. I am not even able to locate the code in the 
KafkaTridentSpoutEmitter class where we are updating the commits?

    conf.put(Config.TOPOLOGY_DEBUG, true);
    conf.put(Config.TOPOLOGY_WORKERS, 1);
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new 
String[]{"localhost"}));
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);

 protected static KafkaSpoutConfig<String, String> getPMStatKafkaSpoutConfig() {
    ByTopicRecordTranslator<String, String> byTopic =
        new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(), 
r.value()),
            new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM);

    return new KafkaSpoutConfig.Builder<String, String>(Utils.getBrokerHosts(),
        StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName())
            .setMaxPartitionFectchBytes(10 * 1024) // 10 KB
            .setRetry(getRetryService())
            .setOffsetCommitPeriodMs(10_000)
            
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
            .setMaxUncommittedOffsets(250)
            .setProp("value.deserializer", 
"io.confluent.kafka.serializers.KafkaAvroDeserializer")
            .setProp("schema.registry.url","http://localhost:8081";)
            .setProp("specific.avro.reader",true)
            .setGroupId(AGGREGATION_CONSUMER_GROUP)
            .setRecordTranslator(byTopic).build();
  }

Stream pmStatStream =
        topology.newStream("statStream", new 
KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)

storm-version - 1.1.0

  was:
Every time I restart the topology the spout was picking the earliest message 
even though poll strategy is set UNCOMMITTED_EARLIEST.  I looked at Kafka's  
__consumer_offsets topic to see it spout (Consumer is committing the offsets 
but did not find any commits). No even able to locate the code in the 
KafkaTridentSpoutEmitter class where we are updating the commits?

    conf.put(Config.TOPOLOGY_DEBUG, true);
    conf.put(Config.TOPOLOGY_WORKERS, 1);
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new 
String[]{"localhost"}));
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);

 protected static KafkaSpoutConfig<String, String> getPMStatKafkaSpoutConfig() {
    ByTopicRecordTranslator<String, String> byTopic =
        new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(), 
r.value()),
            new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM);

    return new KafkaSpoutConfig.Builder<String, String>(Utils.getBrokerHosts(),
        StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName())
            .setMaxPartitionFectchBytes(10 * 1024) // 10 KB
            .setRetry(getRetryService())
            .setOffsetCommitPeriodMs(10_000)
            
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
            .setMaxUncommittedOffsets(250)
            .setProp("value.deserializer", 
"io.confluent.kafka.serializers.KafkaAvroDeserializer")
            .setProp("schema.registry.url","http://localhost:8081";)
            .setProp("specific.avro.reader",true)
            .setGroupId(AGGREGATION_CONSUMER_GROUP)
            .setRecordTranslator(byTopic).build();
  }

Stream pmStatStream =
        topology.newStream("statStream", new 
KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)

storm-version - 1.1.0


> KafkaTridentSpoutOpaque not committing offsets to Kafka
> -------------------------------------------------------
>
>                 Key: STORM-2675
>                 URL: https://issues.apache.org/jira/browse/STORM-2675
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.0
>            Reporter: Preet Puri
>
> Every time I restart the topology the spout was picking the earliest message 
> even though poll strategy is set UNCOMMITTED_EARLIEST.  I looked at Kafka's  
> __consumer_offsets topic to see if spout (consumer) is committing the offsets 
> but did not find any commits. I am not even able to locate the code in the 
> KafkaTridentSpoutEmitter class where we are updating the commits?
>     conf.put(Config.TOPOLOGY_DEBUG, true);
>     conf.put(Config.TOPOLOGY_WORKERS, 1);
>     conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new 
> String[]{"localhost"}));
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
>  protected static KafkaSpoutConfig<String, String> 
> getPMStatKafkaSpoutConfig() {
>     ByTopicRecordTranslator<String, String> byTopic =
>         new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(), 
> r.value()),
>             new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM);
>     return new KafkaSpoutConfig.Builder<String, 
> String>(Utils.getBrokerHosts(),
>         StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName())
>             .setMaxPartitionFectchBytes(10 * 1024) // 10 KB
>             .setRetry(getRetryService())
>             .setOffsetCommitPeriodMs(10_000)
>             
> .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>             .setMaxUncommittedOffsets(250)
>             .setProp("value.deserializer", 
> "io.confluent.kafka.serializers.KafkaAvroDeserializer")
>             .setProp("schema.registry.url","http://localhost:8081";)
>             .setProp("specific.avro.reader",true)
>             .setGroupId(AGGREGATION_CONSUMER_GROUP)
>             .setRecordTranslator(byTopic).build();
>   }
> Stream pmStatStream =
>         topology.newStream("statStream", new 
> KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)
> storm-version - 1.1.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to