Github user janithkv commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2911#discussion_r234878772
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
 ---
    @@ -225,7 +229,23 @@ private void emitTuple(TridentCollector collector, 
ConsumerRecord<K, V> record)
          */
         private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata 
lastBatchMeta) {
             if (isFirstPoll(tp)) {
    -            if (firstPollOffsetStrategy == EARLIEST) {
    +            if(firstPollOffsetStrategy.equals(TIMESTAMP)) {
    +                Long startTimeStampOffset = null;
    +                try {
    +                    startTimeStampOffset =
    +                            
consumer.offsetsForTimes(Collections.singletonMap(tp, 
startTimeStamp)).get(tp).offset();
    +                } catch (IllegalArgumentException e) {
    +                    LOG.error("Illegal timestamp {} provided for tp {} 
",startTimeStamp,tp.toString());
    +                } catch (UnsupportedVersionException e) {
    +                    LOG.error("Kafka Server do not support 
offsetsForTimes(), probably < 0.10.1",e);
    +                }
    +                if(startTimeStampOffset != null) {
    +                    LOG.debug("First poll for topic partition [{}], 
seeking to partition from startTimeStamp [{}]", tp , startTimeStamp);
    +                    consumer.seek(tp, startTimeStampOffset);
    +                } else {
    +                    LOG.info("Kafka consumer offset reset by timestamp 
failed for TopicPartition {}, TimeStamp {}, Offset {}. Restart with a different 
Strategy ", tp, startTimeStamp, startTimeStampOffset);
    --- End diff --
    
    ok will change


---

Reply via email to