Hi Senthil, you should ask this question in the Apache Storm mailing list.
At first sight this looks like a problem with Storm's KafkaSpout implementation, not with Kafka. Best wishes, Michael On Thu, Sep 28, 2017 at 8:47 PM, senthil kumar <gupthasent...@gmail.com> wrote: > Hi Kafka, > > I have a trident topology in storm which consumes data from kafka. Now i am > seeing an issue in KafkaSpout. This is not consuming the very first tthe > first uncommitted offset data from kafka. > > My storm version is 1.1.1 and kafka version is 0.11.0.0. I have a topic say > X and partition of the topic is 3. > > I have following configuration to consume data using KafkaSpout > > > KafkaSpoutConfig<String, String> kafkaConfig = > KafkaSpoutConfig.builder(PropertyUtil.getStringValue( > PropertyUtil.KAFKA_BROKERS), > PropertyUtil.getStringValue(PropertyUtil.TOPIC_NAME)) > .setProp(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "4194304") > .setProp(ConsumerConfig.GROUP_ID_CONFIG,PropertyUtil. > getStringValue(PropertyUtil.CONSUMER_ID)) > .setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, "4194304") > .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") > .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_LATEST) > .build(); > > TopologyBuilder builder = new TopologyBuilder(); > > builder.setSpout("spout", new KafkaSpout<String,String>(kafkaConfig),3); > > Following are my test cases > > 1. Processor started with new consumer id. The very first time it starts to > read the data from latest. Fine. > 2. Sending some messages to kafka and i am seeing all the messages are > consumed by my trident topology. > 3. Stopped my trident topology. > 4. Sending some messages to kafka (partition_0). Say example > > msg_1 > > msg_2 > > msg_3 > > msg_4 > > msg_5 > > 5. Started the topology. And kafkaspout consumes the data from msg_2. It is > not consuming the msg_1. > 6. Stopped the topology. > 7. Sending some messages to kafka to all the partitions (_0, _1, _2). Say > example > Partition_0 > > msg_6 > > msg_7 > > msg_8 > Partition_1 > > msg_9 > > msg_10 > > msg_11 > Partition_2 > > msg_12 > > msg_13 > > msg_14 > > 8. Started the topology. And kafkaspout consumes following messages > > msg_7 > > msg_8 > > msg_10 > > msg_11 > > msg_13 > > msg_14 > > It skipped the earliest uncommitted message in each partition. > > Below is the definitions of UNCOMMITTED_LATEST in JavaDoc. > > UNCOMMITTED_LATEST means that the kafka spout polls records from the last > committed offset, if any. If no offset has been committed, it behaves as > LATEST. > > As per the definitions, it should read from last committed offset. But it > looks like it is reading from uncommitted earliest + 1. I meant the pointer > seems to be wrong. > > Please have a look and let me know if anything wrong in my tests. > > I am expecting a response from you, even it is not an issue. > > Thanks, > Senthil >