Currently if you give the latest consumer group it starts reading data from
earliest offset rather than latest
In KafkaConfig
public long startOffsetTime = OffsetRequest.EarliestTime();
In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if
consumer group is null calls goes to 109 line
if (lastMeta != null) {
/* 98 */ String lastInstanceId = null;
/* 99 */ Map lastTopoMeta = (Map)lastMeta.get("topology");
/* 100 */ if (lastTopoMeta != null)
/* 101 */ lastInstanceId = (String)lastTopoMeta.get("id");
/* */ long offset;
/* 103 */ if ((_config.ignoreZkOffsets) &&
(!_topologyInstanceId.equals(lastInstanceId))) {
/* 104 */ offset = KafkaUtils.getOffset(consumer, _config.topic,
partition, _config.startOffsetTime);
/* */ } else {
/* 106 */ offset = ((Long)lastMeta.get("nextOffset")).longValue();
/* */ }
/* */ } else {
/* 109 */ offset = KafkaUtils.getOffset(consumer, _config.topic,
partition, _config);
/* */ }
Which calls below API. As you can see this call will fetch earliest data rather
than fetching latest
public static long getOffset(SimpleConsumer consumer, String topic, int
partition, KafkaConfig config)
{
long startOffsetTime = config.startOffsetTime;
return getOffset(consumer, topic, partition, startOffsetTime);
}
How it should be (It was there in previous release 0.9.x)
public static long getOffset(SimpleConsumer consumer, String topic, int
partition, KafkaConfig config) {
long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
if ( config.ignoreZkOffsets) {
startOffsetTime = config.startOffsetTime;
}
return getOffset(consumer, topic, partition, startOffsetTime);
}
This code was earlier present but somehow it got removed. I tried to search on
github but didn't found history of change.
Thanks,
Sachin