Updated Branches:
  refs/heads/master df6bd392a -> bda8df6e2

SAMZA-86; Convert GetOffset.getNextOffset to use Option instead of null when no 
offset is available. Applying patch again, with fixed NPE issue.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/bda8df6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/bda8df6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/bda8df6e

Branch: refs/heads/master
Commit: bda8df6e21c02060831ca05a1a23af1685f0ea07
Parents: df6bd39
Author: Rekha Joshi <[email protected]>
Authored: Thu Jan 2 10:02:01 2014 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Thu Jan 2 10:02:01 2014 -0800

----------------------------------------------------------------------
 .../org/apache/samza/system/kafka/BrokerProxy.scala     |  4 ++--
 .../scala/org/apache/samza/system/kafka/GetOffset.scala |  6 +++---
 .../apache/samza/system/kafka/KafkaSystemConsumer.scala |  2 +-
 .../org/apache/samza/system/kafka/TestBrokerProxy.scala | 12 ++++++------
 4 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bda8df6e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 7db32c0..53b2e22 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -77,7 +77,7 @@ abstract class BrokerProxy(
     sc
   }
 
-  def addTopicPartition(tp: TopicAndPartition, lastCheckpointedOffset: String) 
= {
+  def addTopicPartition(tp: TopicAndPartition, lastCheckpointedOffset: 
Option[String]) = {
     debug("Adding new topic and partition %s to queue for %s" format (tp, 
host))
     if (nextOffsets.containsKey(tp)) toss("Already consuming TopicPartition 
%s" format tp)
 
@@ -181,7 +181,7 @@ abstract class BrokerProxy(
       warn("Received OffsetOutOfRange exception for %s. Current offset = %s" 
format (e.tp, nextOffsets.getOrElse(e.tp, "not found in map, likely removed in 
the interim")))
 
       try {
-        val newOffset = offsetGetter.getNextOffset(simpleConsumer, e.tp, null)
+        val newOffset = offsetGetter.getNextOffset(simpleConsumer, e.tp, 
Option(null))
         // Put the new offset into the map (if the tp still exists).  Will 
catch it on the next go-around
         nextOffsets.replace(e.tp, newOffset)
       } catch {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bda8df6e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
index 7ad5435..25cd52c 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
@@ -87,7 +87,7 @@ class GetOffset(default: String, autoOffsetResetTopics: 
Map[String, String] = Ma
    * @param lastCheckpointedOffset Null is acceptable. If not null, return the 
last checkpointed offset, after checking it is valid
    * @return Next offset to read or throw an exception if one has been 
received via the simple consumer
    */
-  def getNextOffset(sc: DefaultFetchSimpleConsumer, tp: TopicAndPartition, 
lastCheckpointedOffset: String): Long = {
+ def getNextOffset(sc: DefaultFetchSimpleConsumer, tp: TopicAndPartition, 
lastCheckpointedOffset: Option[String]): Long = {
     val offsetRequest = new OffsetRequest(Map(tp -> new 
PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
     val offsetResponse = sc.getOffsetsBefore(offsetRequest)
     val partitionOffsetResponse = 
offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to find 
offset information for %s" format tp))
@@ -96,9 +96,9 @@ class GetOffset(default: String, autoOffsetResetTopics: 
Map[String, String] = Ma
 
     val autoOffset = 
partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no 
offsets defined for %s" format tp))
 
-    val actualOffset = Option(lastCheckpointedOffset) match {
+    val actualOffset = lastCheckpointedOffset match {
       case Some(last) => useLastCheckpointedOffset(sc, last, 
tp).getOrElse(autoOffset)
-      case None => autoOffset
+      case _ => autoOffset
     }
 
     info("Final offset to be returned for Topic and Partition %s = %d" format 
(tp, actualOffset))

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bda8df6e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 2b73c61..5dbcd94 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -118,7 +118,7 @@ private[kafka] class KafkaSystemConsumer(
                 val messageSink: MessageSink = sink
               })
 
-              brokerProxy.addTopicPartition(head, lastOffset)
+              brokerProxy.addTopicPartition(head, Option(lastOffset))
             case None => warn("No such topic-partition: %s, dropping." format 
head)
           }
           rest

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bda8df6e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 151c699..9a3a29e 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -162,9 +162,9 @@ class TestBrokerProxy extends Logging {
     val (bp, tp, sink) = getMockBrokerProxy()
 
     bp.start
-    bp.addTopicPartition(tp, "0")
+    bp.addTopicPartition(tp, Option("0"))
     // Add tp2, which should never receive messages since sink disables it.
-    bp.addTopicPartition(tp2, "0")
+    bp.addTopicPartition(tp2, Option("0"))
     Thread.sleep(1000)
     assertEquals(2, sink.receivedMessages.size)
     assertEquals(42, sink.receivedMessages.get(0)._2.offset)
@@ -174,10 +174,10 @@ class TestBrokerProxy extends Logging {
   @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
     val (bp, tp, _) = getMockBrokerProxy()
     bp.start
-    bp.addTopicPartition(tp, "0")
+    bp.addTopicPartition(tp, Option("0"))
 
     try {
-      bp.addTopicPartition(tp, "1")
+      bp.addTopicPartition(tp, Option("1"))
       fail("Should have thrown an exception")
     } catch {
       case se: SamzaException => assertEquals(se.getMessage, "Already 
consuming TopicPartition [Redbird,2012]")
@@ -199,7 +199,7 @@ class TestBrokerProxy extends Logging {
 
     val mockOffsetGetter = mock(classOf[GetOffset])
     // This will be used by the simple consumer below, and this is the 
response that simple consumer needs
-    
when(mockOffsetGetter.getNextOffset(any(classOf[DefaultFetchSimpleConsumer]), 
Matchers.eq(tp), Matchers.eq(null))).thenReturn(1492l)
+    
when(mockOffsetGetter.getNextOffset(any(classOf[DefaultFetchSimpleConsumer]), 
Matchers.eq(tp), Matchers.eq(Option(null)))).thenReturn(1492l)
 
     var callsToCreateSimpleConsumer = 0
     val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
@@ -256,7 +256,7 @@ class TestBrokerProxy extends Logging {
       }
     }
 
-    bp.addTopicPartition(tp, "earliest")
+    bp.addTopicPartition(tp, Option("earliest"))
     bp.start
     countdownLatch.await()
     bp.stop

Reply via email to