Github user janithkv commented on a diff in the pull request:
https://github.com/apache/storm/pull/2911#discussion_r234878772
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
---
@@ -225,7 +229,23 @@ private void emitTuple(TridentCollector collector,
ConsumerRecord<K, V> record)
*/
private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata
lastBatchMeta) {
if (isFirstPoll(tp)) {
- if (firstPollOffsetStrategy == EARLIEST) {
+ if(firstPollOffsetStrategy.equals(TIMESTAMP)) {
+ Long startTimeStampOffset = null;
+ try {
+ startTimeStampOffset =
+
consumer.offsetsForTimes(Collections.singletonMap(tp,
startTimeStamp)).get(tp).offset();
+ } catch (IllegalArgumentException e) {
+ LOG.error("Illegal timestamp {} provided for tp {}
",startTimeStamp,tp.toString());
+ } catch (UnsupportedVersionException e) {
+ LOG.error("Kafka Server do not support
offsetsForTimes(), probably < 0.10.1",e);
+ }
+ if(startTimeStampOffset != null) {
+ LOG.debug("First poll for topic partition [{}],
seeking to partition from startTimeStamp [{}]", tp , startTimeStamp);
+ consumer.seek(tp, startTimeStampOffset);
+ } else {
+ LOG.info("Kafka consumer offset reset by timestamp
failed for TopicPartition {}, TimeStamp {}, Offset {}. Restart with a different
Strategy ", tp, startTimeStamp, startTimeStampOffset);
--- End diff --
ok will change
---