Currently if you give the latest consumer group it starts reading data from 
earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if 
consumer group is null calls goes to 109 line

  if (lastMeta != null) {

/*  98 */       String lastInstanceId = null;

/*  99 */       Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */       if (lastTopoMeta != null)

/* 101 */         lastInstanceId = (String)lastTopoMeta.get("id");

/*     */       long offset;

/* 103 */       if ((_config.ignoreZkOffsets) && 
(!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */         offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config.startOffsetTime);

/*     */       } else {

/* 106 */         offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*     */       }

/*     */     } else {

/* 109 */       offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config);

/*     */     }

Which calls below API. As you can see this call will fetch earliest data rather 
than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config)


 {


    long startOffsetTime = config.startOffsetTime;


    return getOffset(consumer, topic, partition, startOffsetTime);








 }



How it should be (It was there in previous release 0.9.x)

 public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config) {


        long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


        if ( config.ignoreZkOffsets) {


            startOffsetTime = config.startOffsetTime;


        }


        return getOffset(consumer, topic, partition, startOffsetTime);


    }



This code was earlier present but somehow it got removed. I tried to search on 
github but didn't found history of change.

Thanks,

Sachin

Reply via email to