Updated Branches:
  refs/heads/master 04d00f5a4 -> 01caadbca

Revert SAMZA-86.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/75711b4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/75711b4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/75711b4f

Branch: refs/heads/master
Commit: 75711b4f1b0d71e4f1e9829f07b2c19e3e4eb47b
Parents: 04d00f5
Author: Jakob Homan <[email protected]>
Authored: Thu Dec 12 11:55:30 2013 -0800
Committer: Jakob Homan <[email protected]>
Committed: Thu Dec 12 11:55:30 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/samza/system/kafka/BrokerProxy.scala |  2 +-
 .../scala/org/apache/samza/system/kafka/GetOffset.scala   |  4 ++--
 .../apache/samza/system/kafka/KafkaSystemConsumer.scala   |  2 +-
 .../org/apache/samza/system/kafka/TestBrokerProxy.scala   | 10 +++++-----
 4 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/75711b4f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 45a0ce9..7db32c0 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -77,7 +77,7 @@ abstract class BrokerProxy(
     sc
   }
 
-  def addTopicPartition(tp: TopicAndPartition, lastCheckpointedOffset: 
Option[String]) = {
+  def addTopicPartition(tp: TopicAndPartition, lastCheckpointedOffset: String) 
= {
     debug("Adding new topic and partition %s to queue for %s" format (tp, 
host))
     if (nextOffsets.containsKey(tp)) toss("Already consuming TopicPartition 
%s" format tp)
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/75711b4f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
index f69e772..7ad5435 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
@@ -87,7 +87,7 @@ class GetOffset(default: String, autoOffsetResetTopics: 
Map[String, String] = Ma
    * @param lastCheckpointedOffset Null is acceptable. If not null, return the 
last checkpointed offset, after checking it is valid
    * @return Next offset to read or throw an exception if one has been 
received via the simple consumer
    */
-  def getNextOffset(sc: DefaultFetchSimpleConsumer, tp: TopicAndPartition, 
lastCheckpointedOffset: Option[String]): Long = {
+  def getNextOffset(sc: DefaultFetchSimpleConsumer, tp: TopicAndPartition, 
lastCheckpointedOffset: String): Long = {
     val offsetRequest = new OffsetRequest(Map(tp -> new 
PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
     val offsetResponse = sc.getOffsetsBefore(offsetRequest)
     val partitionOffsetResponse = 
offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to find 
offset information for %s" format tp))
@@ -96,7 +96,7 @@ class GetOffset(default: String, autoOffsetResetTopics: 
Map[String, String] = Ma
 
     val autoOffset = 
partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no 
offsets defined for %s" format tp))
 
-    val actualOffset = lastCheckpointedOffset match {
+    val actualOffset = Option(lastCheckpointedOffset) match {
       case Some(last) => useLastCheckpointedOffset(sc, last, 
tp).getOrElse(autoOffset)
       case None => autoOffset
     }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/75711b4f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 5dbcd94..2b73c61 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -118,7 +118,7 @@ private[kafka] class KafkaSystemConsumer(
                 val messageSink: MessageSink = sink
               })
 
-              brokerProxy.addTopicPartition(head, Option(lastOffset))
+              brokerProxy.addTopicPartition(head, lastOffset)
             case None => warn("No such topic-partition: %s, dropping." format 
head)
           }
           rest

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/75711b4f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 89fd3ac..151c699 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -162,9 +162,9 @@ class TestBrokerProxy extends Logging {
     val (bp, tp, sink) = getMockBrokerProxy()
 
     bp.start
-    bp.addTopicPartition(tp, Option("0"))
+    bp.addTopicPartition(tp, "0")
     // Add tp2, which should never receive messages since sink disables it.
-    bp.addTopicPartition(tp2, Option("0"))
+    bp.addTopicPartition(tp2, "0")
     Thread.sleep(1000)
     assertEquals(2, sink.receivedMessages.size)
     assertEquals(42, sink.receivedMessages.get(0)._2.offset)
@@ -174,10 +174,10 @@ class TestBrokerProxy extends Logging {
   @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
     val (bp, tp, _) = getMockBrokerProxy()
     bp.start
-    bp.addTopicPartition(tp, Option("0"))
+    bp.addTopicPartition(tp, "0")
 
     try {
-      bp.addTopicPartition(tp, Option("1"))
+      bp.addTopicPartition(tp, "1")
       fail("Should have thrown an exception")
     } catch {
       case se: SamzaException => assertEquals(se.getMessage, "Already 
consuming TopicPartition [Redbird,2012]")
@@ -256,7 +256,7 @@ class TestBrokerProxy extends Logging {
       }
     }
 
-    bp.addTopicPartition(tp, Option("earliest"))
+    bp.addTopicPartition(tp, "earliest")
     bp.start
     countdownLatch.await()
     bp.stop

Reply via email to