Looks like it changed as a part of
https://issues.apache.org/jira/browse/STORM-563. That might be a good place to
ask.
Specifically it was pull request https://github.com/apache/storm/pull/493.
To me it looks like the code was updated to use ignoreZKOffsets instead of
forceFromStart, but I have not dug into the exact details of the change to know
what all the ramifications might have been.
- Bobby
On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar
<[email protected]> wrote:
Can you look at this please?
From: Sachin Pasalkar
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Thursday, 26 May 2016 9:35 pm
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Cc: Narendra Bidari
<[email protected]<mailto:[email protected]>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group
Can anyone look at this?
From: Sachin Pasalkar
<[email protected]<mailto:[email protected]><mailto:[email protected]>>
Reply-To:
"[email protected]<mailto:[email protected]><mailto:[email protected]>"
<[email protected]<mailto:[email protected]><mailto:[email protected]>>
Date: Thursday, 26 May 2016 1:18 pm
To:
"[email protected]<mailto:[email protected]><mailto:[email protected]>"
<[email protected]<mailto:[email protected]><mailto:[email protected]>>
Cc: Narendra Bidari
<[email protected]<mailto:[email protected]><mailto:[email protected]>>
Subject: Storm's Kafka spout is not reading latest data even with new consumer
group
Currently if you give the latest consumer group it starts reading data from
earliest offset rather than latest
In KafkaConfig
public long startOffsetTime = OffsetRequest.EarliestTime();
In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if
consumer group is null calls goes to 109 line
if (lastMeta != null) {
/* 98 */ String lastInstanceId = null;
/* 99 */ Map lastTopoMeta = (Map)lastMeta.get("topology");
/* 100 */ if (lastTopoMeta != null)
/* 101 */ lastInstanceId = (String)lastTopoMeta.get("id");
/* */ long offset;
/* 103 */ if ((_config.ignoreZkOffsets) &&
(!_topologyInstanceId.equals(lastInstanceId))) {
/* 104 */ offset = KafkaUtils.getOffset(consumer, _config.topic,
partition, _config.startOffsetTime);
/* */ } else {
/* 106 */ offset = ((Long)lastMeta.get("nextOffset")).longValue();
/* */ }
/* */ } else {
/* 109 */ offset = KafkaUtils.getOffset(consumer, _config.topic,
partition, _config);
/* */ }
Which calls below API. As you can see this call will fetch earliest data rather
than fetching latest
public static long getOffset(SimpleConsumer consumer, String topic, int
partition, KafkaConfig config)
{
long startOffsetTime = config.startOffsetTime;
return getOffset(consumer, topic, partition, startOffsetTime);
}
How it should be (It was there in previous release 0.9.x)
public static long getOffset(SimpleConsumer consumer, String topic, int
partition, KafkaConfig config) {
long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
if ( config.ignoreZkOffsets) {
startOffsetTime = config.startOffsetTime;
}
return getOffset(consumer, topic, partition, startOffsetTime);
}
This code was earlier present but somehow it got removed. I tried to search on
github but didn't found history of change.
Thanks,
Sachin