[
https://issues.apache.org/jira/browse/STORM-3279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687429#comment-16687429
]
Janith Kaiprath Valiyalappil edited comment on STORM-3279 at 11/15/18 3:08 AM:
-------------------------------------------------------------------------------
[~Srdo], I think your solution is better than what I had working. Just a small
comment.
I was trying to pass a signal back from seek. But I guess I should have thought
about reading straight from consumer.
https://gist.github.com/janithkv/a5b5e90db9e35d0e370408de0d7f2a72
was (Author: janithkv):
[~Srdo], I think your solution is better than what I had working. Just a small
comment.
I was trying to pass a signal back from seek. But I guess I should have thought
to read straight from consume. But this works too. I tested.
https://gist.github.com/janithkv/a5b5e90db9e35d0e370408de0d7f2a72
> Kafka trident spout could loose its position with EARLIEST or LATEST
> FirstPollOffsetStrategy
> --------------------------------------------------------------------------------------------
>
> Key: STORM-3279
> URL: https://issues.apache.org/jira/browse/STORM-3279
> Project: Apache Storm
> Issue Type: Bug
> Components: trident
> Affects Versions: 2.0.1
> Reporter: Janith Kaiprath Valiyalappil
> Assignee: Janith Kaiprath Valiyalappil
> Priority: Major
> Labels: kafka, trident
>
> In KafkaTridentSpoutEmitter emitPartitionBatch() function, when
> kafkaConsumer.poll(pollTimeoutMs) returns 0 records for the very first
> transaction where FirstPollOffsetStrategy is set to EARLIEST or LATEST, the
> spout fails to move to EARLIEST or LATEST, and continues from the last
> metadata position.
>
> The flow of events which would cause this bug :
>
> 1. FirstPollOffsetStrategy set to EARLIEST or LATEST
> 2. For first transaction after restart txid1 Based on [link
> L164|https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L164]
> ,
> The currentBatch is initialized to lastBatchMeta (which need not be null);
> 3. Later in L171, the consumer seeks to "start" OR "end"
> 4. Then consumer.poll(pollTimeoutMs) is called.
> 5. If poll returns non 0 records , currentBatch is set to a new metadata .
> *If poll returns 0 records,*
> *currentBatch is not reset ie, currentBatch is still lastBatchMeta (which
> need not be null)*
>
> So now in transaction txid2 after txid1, isFirstPoll() returns false, and the
> spout continues from lastBatchMeta.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)