GEARPUMP-17, fix KafkaStorage lookup timestamp
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/678a5096 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/678a5096 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/678a5096 Branch: refs/heads/master Commit: 678a5096d02cdd770326ab246e23116c5701e852 Parents: 4bde18c Author: manuzhang <[email protected]> Authored: Wed Apr 6 16:57:46 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Apr 26 14:26:52 2016 +0800 ---------------------------------------------------------------------- .../scala/io/gearpump/streaming/kafka/KafkaStorage.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/678a5096/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala index c768253..eacd267 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala @@ -94,7 +94,13 @@ class KafkaStorage private[kafka]( } } - + /** + * offsets with timestamp < `time` have already been processed by the system + * so we look up the storage for the first offset with timestamp >= `time` on replay + * + * @param time the timestamp to look up for the earliest unprocessed offset + * @return the earliest unprocessed offset if `time` is in the range, otherwise failure + */ override def lookUp(time: TimeStamp): Try[Array[Byte]] = { if (dataByTime.isEmpty) { Failure(StorageEmpty) @@ -106,7 +112,7 @@ class KafkaStorage private[kafka]( } else if (time > max._1) { Failure(Overflow(max._2)) } else { - Success(dataByTime.reverse.find(_._1 <= time).get._2) + Success(dataByTime.find(_._1 >= time).get._2) } } }
