[Updated Subject]
Incase of 2nd case, as user has changed group.id there will be no history for
this id. So code will automatically fall down to the EARLIEST or LATEST.
I was able to code for it somewhat where I am able to fetch data from certain
offset (with kind of hack). What I have seen is when I provide offset it pull
up the proper records. However,
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager.subscribeKafkaConsumer()
call poll initially which caused off set to get updated to latest. I guess we
need to have work on that on Kafka site to update partition offset incase user
has provided the offset. If that works then we have minimal code to pass on the
offset.
Below are cases I have tried
* Don't provide offset: It will behave normal.
* Have less offset requested: It works with caveat it reads 2 time once
with offset came from subscribeKafkaConsumer call but drops the data for
request as requested & actual position differs. Then it takes my provided value
& works fine further )
* Have bigger number than current offset: This is case where I got blocked
because we are not setting expected offset initially in call of
subscribeKafkaConsumer. As my code keeps updated the offset to user provided.
I can put hack but not sure how it will behave in all cases.
Below is code I inserted in
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(TopicPartition,
KafkaTridentSpoutBatchMetadata<K, V>) where startOffset is offset value
provided by user.
if(startOffset!=null && lastBatchMeta==null){
kafkaConsumer.seek(tp, startOffset + 1); // seek offset provided by user
LOG.debug("Seeking fetch offset provided by user");
}
else if (lastBatchMeta != null) {
kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset
after last offset from previous batch
LOG.debug("Seeking fetch offset to next offset after last offset from previous
batch");
}...
On 18/01/17, 9:30 PM, "Hugo Da Cruz Louro"
<[email protected]<mailto:[email protected]>> wrote:
Hi Sachin,
The 2nd case can likely handled with the committed offset, which is covered by
UNCOMMITTED_EARLIEST or UNCOMMITTED_LATEST.
The 1st case it may make sense but even if you give the start offset, since
Kafka polls a certain number of bytes, and not specifically a number of
records, it may not be trivial to guarantee that the same exact dataset is
polled each time.
However, If we as a community agree that it is useful to support your proposed
feature, I have no particular argument to do so.
Best,
Hugo
PS. We usually have the practice to initiate discussion threads with email
subject prefixed with [DISCUSS]
On Jan 18, 2017, at 6:33 AM, Sachin Pasalkar
<[email protected]<mailto:[email protected]>> wrote:
Hi,
I was looking at code of current KafkaTridentSpoutEmitter & KafkaSpout class.
Can we add functionality based on user provided offset to start from particular
offset? This would be useful incase user wants to reprocess particular data
set. Another example user has changed the group id & aware where old offset
committed & he wants to start processing from same position.
Does this make sense? OR its explicit that it will not be supported?
Thanks,
Sachin