Updated Branches: refs/heads/master ae7801707 -> a268b7e0b
SAMZA-86; Convert GetOffset.getNextOffset to use Option instead of null when no offset is available. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/a268b7e0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/a268b7e0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/a268b7e0 Branch: refs/heads/master Commit: a268b7e0ba763d639fb01513fdc39ac5a4ef1edd Parents: ae78017 Author: Rekha Joshi <[email protected]> Authored: Tue Dec 10 12:38:31 2013 -0800 Committer: Chris Riccomini <[email protected]> Committed: Tue Dec 10 12:38:31 2013 -0800 ---------------------------------------------------------------------- .../scala/org/apache/samza/system/kafka/BrokerProxy.scala | 2 +- .../scala/org/apache/samza/system/kafka/GetOffset.scala | 4 ++-- .../apache/samza/system/kafka/KafkaSystemConsumer.scala | 2 +- .../org/apache/samza/system/kafka/TestBrokerProxy.scala | 10 +++++----- 4 files changed, 9 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a268b7e0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 7db32c0..45a0ce9 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -77,7 +77,7 @@ abstract class BrokerProxy( sc } - def addTopicPartition(tp: TopicAndPartition, lastCheckpointedOffset: String) = { + def addTopicPartition(tp: TopicAndPartition, lastCheckpointedOffset: Option[String]) = { debug("Adding new topic and partition %s to queue for %s" format (tp, host)) if (nextOffsets.containsKey(tp)) toss("Already consuming TopicPartition %s" format tp) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a268b7e0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala index 7ad5435..f69e772 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala @@ -87,7 +87,7 @@ class GetOffset(default: String, autoOffsetResetTopics: Map[String, String] = Ma * @param lastCheckpointedOffset Null is acceptable. If not null, return the last checkpointed offset, after checking it is valid * @return Next offset to read or throw an exception if one has been received via the simple consumer */ - def getNextOffset(sc: DefaultFetchSimpleConsumer, tp: TopicAndPartition, lastCheckpointedOffset: String): Long = { + def getNextOffset(sc: DefaultFetchSimpleConsumer, tp: TopicAndPartition, lastCheckpointedOffset: Option[String]): Long = { val offsetRequest = new OffsetRequest(Map(tp -> new PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1))) val offsetResponse = sc.getOffsetsBefore(offsetRequest) val partitionOffsetResponse = offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to find offset information for %s" format tp)) @@ -96,7 +96,7 @@ class GetOffset(default: String, autoOffsetResetTopics: Map[String, String] = Ma val autoOffset = partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no offsets defined for %s" format tp)) - val actualOffset = Option(lastCheckpointedOffset) match { + val actualOffset = lastCheckpointedOffset match { case Some(last) => useLastCheckpointedOffset(sc, last, tp).getOrElse(autoOffset) case None => autoOffset } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a268b7e0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 2b73c61..5dbcd94 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -118,7 +118,7 @@ private[kafka] class KafkaSystemConsumer( val messageSink: MessageSink = sink }) - brokerProxy.addTopicPartition(head, lastOffset) + brokerProxy.addTopicPartition(head, Option(lastOffset)) case None => warn("No such topic-partition: %s, dropping." format head) } rest http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a268b7e0/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index 151c699..89fd3ac 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -162,9 +162,9 @@ class TestBrokerProxy extends Logging { val (bp, tp, sink) = getMockBrokerProxy() bp.start - bp.addTopicPartition(tp, "0") + bp.addTopicPartition(tp, Option("0")) // Add tp2, which should never receive messages since sink disables it. - bp.addTopicPartition(tp2, "0") + bp.addTopicPartition(tp2, Option("0")) Thread.sleep(1000) assertEquals(2, sink.receivedMessages.size) assertEquals(42, sink.receivedMessages.get(0)._2.offset) @@ -174,10 +174,10 @@ class TestBrokerProxy extends Logging { @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = { val (bp, tp, _) = getMockBrokerProxy() bp.start - bp.addTopicPartition(tp, "0") + bp.addTopicPartition(tp, Option("0")) try { - bp.addTopicPartition(tp, "1") + bp.addTopicPartition(tp, Option("1")) fail("Should have thrown an exception") } catch { case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]") @@ -256,7 +256,7 @@ class TestBrokerProxy extends Logging { } } - bp.addTopicPartition(tp, "earliest") + bp.addTopicPartition(tp, Option("earliest")) bp.start countdownLatch.await() bp.stop
