Updated Branches:
  refs/heads/master ae7801707 -> a268b7e0b

SAMZA-86; Convert GetOffset.getNextOffset to use Option instead of null when no 
offset is available.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/a268b7e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/a268b7e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/a268b7e0

Branch: refs/heads/master
Commit: a268b7e0ba763d639fb01513fdc39ac5a4ef1edd
Parents: ae78017
Author: Rekha Joshi <[email protected]>
Authored: Tue Dec 10 12:38:31 2013 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Tue Dec 10 12:38:31 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/samza/system/kafka/BrokerProxy.scala |  2 +-
 .../scala/org/apache/samza/system/kafka/GetOffset.scala   |  4 ++--
 .../apache/samza/system/kafka/KafkaSystemConsumer.scala   |  2 +-
 .../org/apache/samza/system/kafka/TestBrokerProxy.scala   | 10 +++++-----
 4 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a268b7e0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 7db32c0..45a0ce9 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -77,7 +77,7 @@ abstract class BrokerProxy(
     sc
   }
 
-  def addTopicPartition(tp: TopicAndPartition, lastCheckpointedOffset: String) 
= {
+  def addTopicPartition(tp: TopicAndPartition, lastCheckpointedOffset: 
Option[String]) = {
     debug("Adding new topic and partition %s to queue for %s" format (tp, 
host))
     if (nextOffsets.containsKey(tp)) toss("Already consuming TopicPartition 
%s" format tp)
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a268b7e0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
index 7ad5435..f69e772 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
@@ -87,7 +87,7 @@ class GetOffset(default: String, autoOffsetResetTopics: 
Map[String, String] = Ma
    * @param lastCheckpointedOffset Null is acceptable. If not null, return the 
last checkpointed offset, after checking it is valid
    * @return Next offset to read or throw an exception if one has been 
received via the simple consumer
    */
-  def getNextOffset(sc: DefaultFetchSimpleConsumer, tp: TopicAndPartition, 
lastCheckpointedOffset: String): Long = {
+  def getNextOffset(sc: DefaultFetchSimpleConsumer, tp: TopicAndPartition, 
lastCheckpointedOffset: Option[String]): Long = {
     val offsetRequest = new OffsetRequest(Map(tp -> new 
PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
     val offsetResponse = sc.getOffsetsBefore(offsetRequest)
     val partitionOffsetResponse = 
offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to find 
offset information for %s" format tp))
@@ -96,7 +96,7 @@ class GetOffset(default: String, autoOffsetResetTopics: 
Map[String, String] = Ma
 
     val autoOffset = 
partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no 
offsets defined for %s" format tp))
 
-    val actualOffset = Option(lastCheckpointedOffset) match {
+    val actualOffset = lastCheckpointedOffset match {
       case Some(last) => useLastCheckpointedOffset(sc, last, 
tp).getOrElse(autoOffset)
       case None => autoOffset
     }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a268b7e0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 2b73c61..5dbcd94 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -118,7 +118,7 @@ private[kafka] class KafkaSystemConsumer(
                 val messageSink: MessageSink = sink
               })
 
-              brokerProxy.addTopicPartition(head, lastOffset)
+              brokerProxy.addTopicPartition(head, Option(lastOffset))
             case None => warn("No such topic-partition: %s, dropping." format 
head)
           }
           rest

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a268b7e0/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 151c699..89fd3ac 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -162,9 +162,9 @@ class TestBrokerProxy extends Logging {
     val (bp, tp, sink) = getMockBrokerProxy()
 
     bp.start
-    bp.addTopicPartition(tp, "0")
+    bp.addTopicPartition(tp, Option("0"))
     // Add tp2, which should never receive messages since sink disables it.
-    bp.addTopicPartition(tp2, "0")
+    bp.addTopicPartition(tp2, Option("0"))
     Thread.sleep(1000)
     assertEquals(2, sink.receivedMessages.size)
     assertEquals(42, sink.receivedMessages.get(0)._2.offset)
@@ -174,10 +174,10 @@ class TestBrokerProxy extends Logging {
   @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
     val (bp, tp, _) = getMockBrokerProxy()
     bp.start
-    bp.addTopicPartition(tp, "0")
+    bp.addTopicPartition(tp, Option("0"))
 
     try {
-      bp.addTopicPartition(tp, "1")
+      bp.addTopicPartition(tp, Option("1"))
       fail("Should have thrown an exception")
     } catch {
       case se: SamzaException => assertEquals(se.getMessage, "Already 
consuming TopicPartition [Redbird,2012]")
@@ -256,7 +256,7 @@ class TestBrokerProxy extends Logging {
       }
     }
 
-    bp.addTopicPartition(tp, "earliest")
+    bp.addTopicPartition(tp, Option("earliest"))
     bp.start
     countdownLatch.await()
     bp.stop

Reply via email to