[ 
https://issues.apache.org/jira/browse/STORM-3145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim closed STORM-3145.
-------------------------------
    Resolution: Won't Fix

Please send any question to the user mailing list, u...@storm.apache.org

http://storm.apache.org/getting-help.html for more details to access mailing 
list.

> UNCOMMITTED_EARLIEST and UNCOMMITTED_LATEST doesn't work as expected
> --------------------------------------------------------------------
>
>                 Key: STORM-3145
>                 URL: https://issues.apache.org/jira/browse/STORM-3145
>             Project: Apache Storm
>          Issue Type: Question
>          Components: storm-kafka-client, trident
>            Reporter: Prakash N M
>            Priority: Blocker
>              Labels: storm-kafka-client, trident
>
> I am using storm-kafka-client 1.2.2.
> When I ran my topology I got NPE as mentioned in 
> https://issues.apache.org/jira/browse/STORM-3046.
> So I modified the KafkaTridentSpoutEmitter.java#seek with patch mentioned in 
> the Jira STORM-3046. Below is the modified code.
> {noformat}
> private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata 
> lastBatchMeta, long transactionId) {
> if (isFirstPoll(tp, transactionId)) {
> if (firstPollOffsetStrategy == EARLIEST) {
> LOG.debug("First poll for topic partition [{}], seeking to partition 
> beginning", tp);
> kafkaConsumer.seekToBeginning(Collections.singleton(tp));
> } else if (firstPollOffsetStrategy == LATEST) {
> LOG.debug("First poll for topic partition [{}], seeking to partition end", 
> tp);
> kafkaConsumer.seekToEnd(Collections.singleton(tp));
> } else if (lastBatchMeta != null) {
> LOG.debug("First poll for topic partition [{}], using last batch metadata", 
> tp);
> kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next 
> offset after last offset from previous batch
> } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
> LOG.debug("First poll for topic partition [{}] with no last batch metadata, 
> seeking to partition beginning", tp);
> kafkaConsumer.seekToBeginning(Collections.singleton(tp));
> } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) {
> LOG.debug("First poll for topic partition [{}] with no last batch metadata, 
> seeking to partition end", tp);
> kafkaConsumer.seekToEnd(Collections.singleton(tp));
> }
> firstPollTransaction.put(tp, transactionId);
> } else if (lastBatchMeta != null) {
>         kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek 
> next offset after last offset from previous batch
>         LOG.debug("First poll for topic partition [{}], using last batch 
> metadata", tp);
> } else {
>         long initialFetchOffset = firstPollTransaction.get(tp);
>         OffsetAndMetadata lastCommittedOffset = kafkaConsumer.committed(tp);
>         kafkaConsumer.seek(tp, lastCommittedOffset.offset());
>         LOG.debug("First poll for topic partition [{}], no last batch 
> metadata present."
>                   + " Using stored initial fetch offset [{}]", tp, 
> initialFetchOffset);
> }
>     final long fetchOffset = kafkaConsumer.position(tp);
>     LOG.debug("Set [fetchOffset = {}] for partition [{}]", fetchOffset, tp);
>     return fetchOffset;
> }{noformat}
> Now after code change - when the offset strategy is UNCOMMITTED_EARLIEST for 
> the very first time when it is the first poll for the spout, lastBatchMeta is 
> null and the kafka consumer seeks to the beginning offset. Should I be 
> checking for last commit and starting from there? Likewise for the next fetch 
> (when lastBatchMeta is not null), kafka consumer seeks to 
> lastBatchMeta.getLastOffset() + 1. Should I be doing same here, checking for 
> last commit and starting from there?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to