Updated Branches: refs/heads/master df6bd392a -> bda8df6e2
SAMZA-86; Convert GetOffset.getNextOffset to use Option instead of null when no offset is available. Applying patch again, with fixed NPE issue. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/bda8df6e Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/bda8df6e Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/bda8df6e Branch: refs/heads/master Commit: bda8df6e21c02060831ca05a1a23af1685f0ea07 Parents: df6bd39 Author: Rekha Joshi <[email protected]> Authored: Thu Jan 2 10:02:01 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Thu Jan 2 10:02:01 2014 -0800 ---------------------------------------------------------------------- .../org/apache/samza/system/kafka/BrokerProxy.scala | 4 ++-- .../scala/org/apache/samza/system/kafka/GetOffset.scala | 6 +++--- .../apache/samza/system/kafka/KafkaSystemConsumer.scala | 2 +- .../org/apache/samza/system/kafka/TestBrokerProxy.scala | 12 ++++++------ 4 files changed, 12 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bda8df6e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 7db32c0..53b2e22 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -77,7 +77,7 @@ abstract class BrokerProxy( sc } - def addTopicPartition(tp: TopicAndPartition, lastCheckpointedOffset: String) = { + def addTopicPartition(tp: TopicAndPartition, lastCheckpointedOffset: Option[String]) = { debug("Adding new topic and partition %s to queue for %s" format (tp, host)) if (nextOffsets.containsKey(tp)) toss("Already consuming TopicPartition %s" format tp) @@ -181,7 +181,7 @@ abstract class BrokerProxy( warn("Received OffsetOutOfRange exception for %s. Current offset = %s" format (e.tp, nextOffsets.getOrElse(e.tp, "not found in map, likely removed in the interim"))) try { - val newOffset = offsetGetter.getNextOffset(simpleConsumer, e.tp, null) + val newOffset = offsetGetter.getNextOffset(simpleConsumer, e.tp, Option(null)) // Put the new offset into the map (if the tp still exists). Will catch it on the next go-around nextOffsets.replace(e.tp, newOffset) } catch { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bda8df6e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala index 7ad5435..25cd52c 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala @@ -87,7 +87,7 @@ class GetOffset(default: String, autoOffsetResetTopics: Map[String, String] = Ma * @param lastCheckpointedOffset Null is acceptable. If not null, return the last checkpointed offset, after checking it is valid * @return Next offset to read or throw an exception if one has been received via the simple consumer */ - def getNextOffset(sc: DefaultFetchSimpleConsumer, tp: TopicAndPartition, lastCheckpointedOffset: String): Long = { + def getNextOffset(sc: DefaultFetchSimpleConsumer, tp: TopicAndPartition, lastCheckpointedOffset: Option[String]): Long = { val offsetRequest = new OffsetRequest(Map(tp -> new PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1))) val offsetResponse = sc.getOffsetsBefore(offsetRequest) val partitionOffsetResponse = offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to find offset information for %s" format tp)) @@ -96,9 +96,9 @@ class GetOffset(default: String, autoOffsetResetTopics: Map[String, String] = Ma val autoOffset = partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no offsets defined for %s" format tp)) - val actualOffset = Option(lastCheckpointedOffset) match { + val actualOffset = lastCheckpointedOffset match { case Some(last) => useLastCheckpointedOffset(sc, last, tp).getOrElse(autoOffset) - case None => autoOffset + case _ => autoOffset } info("Final offset to be returned for Topic and Partition %s = %d" format (tp, actualOffset)) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bda8df6e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 2b73c61..5dbcd94 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -118,7 +118,7 @@ private[kafka] class KafkaSystemConsumer( val messageSink: MessageSink = sink }) - brokerProxy.addTopicPartition(head, lastOffset) + brokerProxy.addTopicPartition(head, Option(lastOffset)) case None => warn("No such topic-partition: %s, dropping." format head) } rest http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bda8df6e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index 151c699..9a3a29e 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -162,9 +162,9 @@ class TestBrokerProxy extends Logging { val (bp, tp, sink) = getMockBrokerProxy() bp.start - bp.addTopicPartition(tp, "0") + bp.addTopicPartition(tp, Option("0")) // Add tp2, which should never receive messages since sink disables it. - bp.addTopicPartition(tp2, "0") + bp.addTopicPartition(tp2, Option("0")) Thread.sleep(1000) assertEquals(2, sink.receivedMessages.size) assertEquals(42, sink.receivedMessages.get(0)._2.offset) @@ -174,10 +174,10 @@ class TestBrokerProxy extends Logging { @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = { val (bp, tp, _) = getMockBrokerProxy() bp.start - bp.addTopicPartition(tp, "0") + bp.addTopicPartition(tp, Option("0")) try { - bp.addTopicPartition(tp, "1") + bp.addTopicPartition(tp, Option("1")) fail("Should have thrown an exception") } catch { case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]") @@ -199,7 +199,7 @@ class TestBrokerProxy extends Logging { val mockOffsetGetter = mock(classOf[GetOffset]) // This will be used by the simple consumer below, and this is the response that simple consumer needs - when(mockOffsetGetter.getNextOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq(null))).thenReturn(1492l) + when(mockOffsetGetter.getNextOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq(Option(null)))).thenReturn(1492l) var callsToCreateSimpleConsumer = 0 val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) @@ -256,7 +256,7 @@ class TestBrokerProxy extends Logging { } } - bp.addTopicPartition(tp, "earliest") + bp.addTopicPartition(tp, Option("earliest")) bp.start countdownLatch.await() bp.stop
