does setting kafkaConfig.startOffsetTime = OffsetRequest.LatestTime() not
work?

On Sun, May 29, 2016 at 7:56 PM, Sachin Pasalkar <
[email protected]> wrote:

> I looked at discussion thread. It looks like user did this changes so new
> consumer will start reading data from earliest offset rather than latest.
> They haven’t consider below case as well if the there is changes in data &
> user forgot to clear old data from kafka topic it will cause mess (If user
> start with new consumer user will expect to read it from latest OR he can
> set offset explicitly) Setting to earliest is more error prone in PROD.
>
> Thoughts?
>
> From: Sachin Pasalkar <[email protected]<mailto:
> [email protected]>>
> Reply-To: "[email protected]<mailto:[email protected]>" <
> [email protected]<mailto:[email protected]>>
> Date: Saturday, 28 May 2016 5:12 pm
> To: "[email protected]<mailto:[email protected]>" <
> [email protected]<mailto:[email protected]>>, Bobby Evans <
> [email protected]<mailto:[email protected]>>
> Cc: Narendra Bidari <[email protected]<mailto:
> [email protected]>>
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Thanks Bobby. I will ask on ticket.
>
> From: Bobby Evans <[email protected]<mailto:
> [email protected]><mailto:[email protected]>>
> Reply-To: "[email protected]<mailto:[email protected]><mailto:
> [email protected]>" <[email protected]<mailto:[email protected]
> ><mailto:[email protected]>>, Bobby Evans <[email protected]<mailto:
> [email protected]><mailto:[email protected]>>
> Date: Friday, 27 May 2016 7:45 pm
> To: "[email protected]<mailto:[email protected]><mailto:
> [email protected]>" <[email protected]<mailto:[email protected]
> ><mailto:[email protected]>>
> Cc: Narendra Bidari <[email protected]<mailto:
> [email protected]><mailto:[email protected]>>
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Looks like it changed as a part of
> https://issues.apache.org/jira/browse/STORM-563.  That might be a good
> place to ask.
> Specifically it was pull request https://github.com/apache/storm/pull/493.
> To me it looks like the code was updated to use ignoreZKOffsets instead of
> forceFromStart, but I have not dug into the exact details of the change to
> know what all the ramifications might have been.
> - Bobby
>
>     On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar <
> [email protected]<mailto:[email protected]><mailto:
> [email protected]>> wrote:
>
> Can you look at this please?
>
> From: Sachin Pasalkar <[email protected]<mailto:
> [email protected]><mailto:[email protected]><mailto:
> [email protected]>>
> Reply-To: "[email protected]<mailto:[email protected]><mailto:
> [email protected]><mailto:[email protected]>" <[email protected]
> <mailto:[email protected]><mailto:[email protected]><mailto:
> [email protected]>>
> Date: Thursday, 26 May 2016 9:35 pm
> To: "[email protected]<mailto:[email protected]><mailto:
> [email protected]><mailto:[email protected]>" <[email protected]
> <mailto:[email protected]><mailto:[email protected]><mailto:
> [email protected]>>
> Cc: Narendra Bidari <[email protected]<mailto:
> [email protected]><mailto:[email protected]><mailto:
> [email protected]>>
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Can anyone look at this?
>
> From: Sachin Pasalkar <[email protected]<mailto:
> [email protected]><mailto:[email protected]><mailto:
> [email protected]><mailto:[email protected]>>
> Reply-To: "[email protected]<mailto:[email protected]><mailto:
> [email protected]><mailto:[email protected]><mailto:
> [email protected]>" <[email protected]<mailto:[email protected]
> ><mailto:[email protected]><mailto:[email protected]><mailto:
> [email protected]>>
> Date: Thursday, 26 May 2016 1:18 pm
> To: "[email protected]<mailto:[email protected]><mailto:
> [email protected]><mailto:[email protected]><mailto:
> [email protected]>" <[email protected]<mailto:[email protected]
> ><mailto:[email protected]><mailto:[email protected]><mailto:
> [email protected]>>
> Cc: Narendra Bidari <[email protected]<mailto:
> [email protected]><mailto:[email protected]><mailto:
> [email protected]><mailto:[email protected]>>
> Subject: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Currently if you give the latest consumer group it starts reading data
> from earliest offset rather than latest
>
> In KafkaConfig
>
> public long startOffsetTime = OffsetRequest.EarliestTime();
>
>
> In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter,
> if consumer group is null calls goes to 109 line
>
>   if (lastMeta != null) {
>
> /*  98 */      String lastInstanceId = null;
>
> /*  99 */      Map lastTopoMeta = (Map)lastMeta.get("topology");
>
> /* 100 */      if (lastTopoMeta != null)
>
> /* 101 */        lastInstanceId = (String)lastTopoMeta.get("id");
>
> /*    */      long offset;
>
> /* 103 */      if ((_config.ignoreZkOffsets) &&
> (!_topologyInstanceId.equals(lastInstanceId))) {
>
> /* 104 */        offset = KafkaUtils.getOffset(consumer, _config.topic,
> partition, _config.startOffsetTime);
>
> /*    */      } else {
>
> /* 106 */        offset = ((Long)lastMeta.get("nextOffset")).longValue();
>
> /*    */      }
>
> /*    */    } else {
>
> /* 109 */      offset = KafkaUtils.getOffset(consumer, _config.topic,
> partition, _config);
>
> /*    */    }
>
> Which calls below API. As you can see this call will fetch earliest data
> rather than fetching latest
>
> public static long getOffset(SimpleConsumer consumer, String topic, int
> partition, KafkaConfig config)
>
>
> {
>
>
>     long startOffsetTime = config.startOffsetTime;
>
>
>     return getOffset(consumer, topic, partition, startOffsetTime);
>
>
>
>
>
>
>
>
> }
>
>
>
> How it should be (It was there in previous release 0.9.x)
>
> public static long getOffset(SimpleConsumer consumer, String topic, int
> partition, KafkaConfig config) {
>
>
>         long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
>
>
>         if ( config.ignoreZkOffsets) {
>
>
>             startOffsetTime = config.startOffsetTime;
>
>
>         }
>
>
>         return getOffset(consumer, topic, partition, startOffsetTime);
>
>
>     }
>
>
>
> This code was earlier present but somehow it got removed. I tried to
> search on github but didn't found history of change.
>
> Thanks,
>
> Sachin
>
>
>
>
>
>


-- 
Regards,
Abhishek Agarwal

Reply via email to