[Updated Subject]

Incase of 2nd case, as user has changed group.id there will be no history for 
this id. So code will automatically fall down to the EARLIEST or LATEST.

I was able to code for it somewhat where I am able to fetch data from certain 
offset (with kind of hack). What I have seen is when I provide offset it pull 
up the proper records. However, 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager.subscribeKafkaConsumer()
 call poll initially which caused off set to get updated to latest. I guess we 
need to have work on that on Kafka site to update partition offset incase user 
has provided the offset. If that works then we have minimal code to pass on the 
offset.

Below are cases I have tried

  *   Don't provide offset: It will behave normal.
  *   Have less offset requested: It works with caveat it reads 2 time once 
with offset came from subscribeKafkaConsumer call but drops the data for 
request as requested & actual position differs. Then it takes my provided value 
& works fine further )
  *   Have bigger number than current offset: This is case where I got blocked 
because we are not setting expected offset initially in call of 
subscribeKafkaConsumer.  As my code keeps updated the offset to user provided. 
I can put hack but not sure how it will behave in all cases.

Below is code I inserted in
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(TopicPartition,
 KafkaTridentSpoutBatchMetadata<K, V>) where startOffset is offset value 
provided by user.

if(startOffset!=null && lastBatchMeta==null){

kafkaConsumer.seek(tp, startOffset + 1);  // seek offset provided by user

LOG.debug("Seeking fetch offset provided by user");

}

else if (lastBatchMeta != null) {

kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset 
after last offset from previous batch

LOG.debug("Seeking fetch offset to next offset after last offset from previous 
batch");


}...

On 18/01/17, 9:30 PM, "Hugo Da Cruz Louro" 
<[email protected]<mailto:[email protected]>> wrote:

Hi Sachin,

The 2nd case can likely handled with the committed offset, which is covered by 
UNCOMMITTED_EARLIEST or UNCOMMITTED_LATEST.

The 1st case it may make sense but even if you give the start offset, since 
Kafka polls a certain number of bytes, and not specifically a number of 
records, it may not be trivial to guarantee that the same exact dataset is 
polled each time.

However, If we as a community agree that it is useful to support your proposed 
feature, I have no particular argument to do so.

Best,
Hugo
PS. We usually have the practice to initiate discussion threads with email 
subject prefixed with [DISCUSS]



On Jan 18, 2017, at 6:33 AM, Sachin Pasalkar 
<[email protected]<mailto:[email protected]>> wrote:
Hi,
I was looking at code of current KafkaTridentSpoutEmitter & KafkaSpout class. 
Can we add functionality based on user provided offset to start from particular 
offset? This would be useful incase user wants to reprocess particular data 
set. Another example user has changed the group id & aware where old offset 
committed & he wants to start processing from same position.
Does this make sense? OR its explicit that it will not be supported?
Thanks,
Sachin


Reply via email to