does setting kafkaConfig.startOffsetTime = OffsetRequest.LatestTime() not work?
On Sun, May 29, 2016 at 7:56 PM, Sachin Pasalkar < [email protected]> wrote: > I looked at discussion thread. It looks like user did this changes so new > consumer will start reading data from earliest offset rather than latest. > They haven’t consider below case as well if the there is changes in data & > user forgot to clear old data from kafka topic it will cause mess (If user > start with new consumer user will expect to read it from latest OR he can > set offset explicitly) Setting to earliest is more error prone in PROD. > > Thoughts? > > From: Sachin Pasalkar <[email protected]<mailto: > [email protected]>> > Reply-To: "[email protected]<mailto:[email protected]>" < > [email protected]<mailto:[email protected]>> > Date: Saturday, 28 May 2016 5:12 pm > To: "[email protected]<mailto:[email protected]>" < > [email protected]<mailto:[email protected]>>, Bobby Evans < > [email protected]<mailto:[email protected]>> > Cc: Narendra Bidari <[email protected]<mailto: > [email protected]>> > Subject: Re: Storm's Kafka spout is not reading latest data even with new > consumer group > > Thanks Bobby. I will ask on ticket. > > From: Bobby Evans <[email protected]<mailto: > [email protected]><mailto:[email protected]>> > Reply-To: "[email protected]<mailto:[email protected]><mailto: > [email protected]>" <[email protected]<mailto:[email protected] > ><mailto:[email protected]>>, Bobby Evans <[email protected]<mailto: > [email protected]><mailto:[email protected]>> > Date: Friday, 27 May 2016 7:45 pm > To: "[email protected]<mailto:[email protected]><mailto: > [email protected]>" <[email protected]<mailto:[email protected] > ><mailto:[email protected]>> > Cc: Narendra Bidari <[email protected]<mailto: > [email protected]><mailto:[email protected]>> > Subject: Re: Storm's Kafka spout is not reading latest data even with new > consumer group > > Looks like it changed as a part of > https://issues.apache.org/jira/browse/STORM-563. That might be a good > place to ask. > Specifically it was pull request https://github.com/apache/storm/pull/493. > To me it looks like the code was updated to use ignoreZKOffsets instead of > forceFromStart, but I have not dug into the exact details of the change to > know what all the ramifications might have been. > - Bobby > > On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar < > [email protected]<mailto:[email protected]><mailto: > [email protected]>> wrote: > > Can you look at this please? > > From: Sachin Pasalkar <[email protected]<mailto: > [email protected]><mailto:[email protected]><mailto: > [email protected]>> > Reply-To: "[email protected]<mailto:[email protected]><mailto: > [email protected]><mailto:[email protected]>" <[email protected] > <mailto:[email protected]><mailto:[email protected]><mailto: > [email protected]>> > Date: Thursday, 26 May 2016 9:35 pm > To: "[email protected]<mailto:[email protected]><mailto: > [email protected]><mailto:[email protected]>" <[email protected] > <mailto:[email protected]><mailto:[email protected]><mailto: > [email protected]>> > Cc: Narendra Bidari <[email protected]<mailto: > [email protected]><mailto:[email protected]><mailto: > [email protected]>> > Subject: Re: Storm's Kafka spout is not reading latest data even with new > consumer group > > Can anyone look at this? > > From: Sachin Pasalkar <[email protected]<mailto: > [email protected]><mailto:[email protected]><mailto: > [email protected]><mailto:[email protected]>> > Reply-To: "[email protected]<mailto:[email protected]><mailto: > [email protected]><mailto:[email protected]><mailto: > [email protected]>" <[email protected]<mailto:[email protected] > ><mailto:[email protected]><mailto:[email protected]><mailto: > [email protected]>> > Date: Thursday, 26 May 2016 1:18 pm > To: "[email protected]<mailto:[email protected]><mailto: > [email protected]><mailto:[email protected]><mailto: > [email protected]>" <[email protected]<mailto:[email protected] > ><mailto:[email protected]><mailto:[email protected]><mailto: > [email protected]>> > Cc: Narendra Bidari <[email protected]<mailto: > [email protected]><mailto:[email protected]><mailto: > [email protected]><mailto:[email protected]>> > Subject: Storm's Kafka spout is not reading latest data even with new > consumer group > > Currently if you give the latest consumer group it starts reading data > from earliest offset rather than latest > > In KafkaConfig > > public long startOffsetTime = OffsetRequest.EarliestTime(); > > > In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, > if consumer group is null calls goes to 109 line > > if (lastMeta != null) { > > /* 98 */ String lastInstanceId = null; > > /* 99 */ Map lastTopoMeta = (Map)lastMeta.get("topology"); > > /* 100 */ if (lastTopoMeta != null) > > /* 101 */ lastInstanceId = (String)lastTopoMeta.get("id"); > > /* */ long offset; > > /* 103 */ if ((_config.ignoreZkOffsets) && > (!_topologyInstanceId.equals(lastInstanceId))) { > > /* 104 */ offset = KafkaUtils.getOffset(consumer, _config.topic, > partition, _config.startOffsetTime); > > /* */ } else { > > /* 106 */ offset = ((Long)lastMeta.get("nextOffset")).longValue(); > > /* */ } > > /* */ } else { > > /* 109 */ offset = KafkaUtils.getOffset(consumer, _config.topic, > partition, _config); > > /* */ } > > Which calls below API. As you can see this call will fetch earliest data > rather than fetching latest > > public static long getOffset(SimpleConsumer consumer, String topic, int > partition, KafkaConfig config) > > > { > > > long startOffsetTime = config.startOffsetTime; > > > return getOffset(consumer, topic, partition, startOffsetTime); > > > > > > > > > } > > > > How it should be (It was there in previous release 0.9.x) > > public static long getOffset(SimpleConsumer consumer, String topic, int > partition, KafkaConfig config) { > > > long startOffsetTime = kafka.api.OffsetRequest.LatestTime(); > > > if ( config.ignoreZkOffsets) { > > > startOffsetTime = config.startOffsetTime; > > > } > > > return getOffset(consumer, topic, partition, startOffsetTime); > > > } > > > > This code was earlier present but somehow it got removed. I tried to > search on github but didn't found history of change. > > Thanks, > > Sachin > > > > > > -- Regards, Abhishek Agarwal
