Updated Branches: refs/heads/master 04d00f5a4 -> 01caadbca
Revert SAMZA-86. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/75711b4f Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/75711b4f Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/75711b4f Branch: refs/heads/master Commit: 75711b4f1b0d71e4f1e9829f07b2c19e3e4eb47b Parents: 04d00f5 Author: Jakob Homan <[email protected]> Authored: Thu Dec 12 11:55:30 2013 -0800 Committer: Jakob Homan <[email protected]> Committed: Thu Dec 12 11:55:30 2013 -0800 ---------------------------------------------------------------------- .../scala/org/apache/samza/system/kafka/BrokerProxy.scala | 2 +- .../scala/org/apache/samza/system/kafka/GetOffset.scala | 4 ++-- .../apache/samza/system/kafka/KafkaSystemConsumer.scala | 2 +- .../org/apache/samza/system/kafka/TestBrokerProxy.scala | 10 +++++----- 4 files changed, 9 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/75711b4f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 45a0ce9..7db32c0 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -77,7 +77,7 @@ abstract class BrokerProxy( sc } - def addTopicPartition(tp: TopicAndPartition, lastCheckpointedOffset: Option[String]) = { + def addTopicPartition(tp: TopicAndPartition, lastCheckpointedOffset: String) = { debug("Adding new topic and partition %s to queue for %s" format (tp, host)) if (nextOffsets.containsKey(tp)) toss("Already consuming TopicPartition %s" format tp) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/75711b4f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala index f69e772..7ad5435 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala @@ -87,7 +87,7 @@ class GetOffset(default: String, autoOffsetResetTopics: Map[String, String] = Ma * @param lastCheckpointedOffset Null is acceptable. If not null, return the last checkpointed offset, after checking it is valid * @return Next offset to read or throw an exception if one has been received via the simple consumer */ - def getNextOffset(sc: DefaultFetchSimpleConsumer, tp: TopicAndPartition, lastCheckpointedOffset: Option[String]): Long = { + def getNextOffset(sc: DefaultFetchSimpleConsumer, tp: TopicAndPartition, lastCheckpointedOffset: String): Long = { val offsetRequest = new OffsetRequest(Map(tp -> new PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1))) val offsetResponse = sc.getOffsetsBefore(offsetRequest) val partitionOffsetResponse = offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to find offset information for %s" format tp)) @@ -96,7 +96,7 @@ class GetOffset(default: String, autoOffsetResetTopics: Map[String, String] = Ma val autoOffset = partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no offsets defined for %s" format tp)) - val actualOffset = lastCheckpointedOffset match { + val actualOffset = Option(lastCheckpointedOffset) match { case Some(last) => useLastCheckpointedOffset(sc, last, tp).getOrElse(autoOffset) case None => autoOffset } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/75711b4f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 5dbcd94..2b73c61 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -118,7 +118,7 @@ private[kafka] class KafkaSystemConsumer( val messageSink: MessageSink = sink }) - brokerProxy.addTopicPartition(head, Option(lastOffset)) + brokerProxy.addTopicPartition(head, lastOffset) case None => warn("No such topic-partition: %s, dropping." format head) } rest http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/75711b4f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index 89fd3ac..151c699 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -162,9 +162,9 @@ class TestBrokerProxy extends Logging { val (bp, tp, sink) = getMockBrokerProxy() bp.start - bp.addTopicPartition(tp, Option("0")) + bp.addTopicPartition(tp, "0") // Add tp2, which should never receive messages since sink disables it. - bp.addTopicPartition(tp2, Option("0")) + bp.addTopicPartition(tp2, "0") Thread.sleep(1000) assertEquals(2, sink.receivedMessages.size) assertEquals(42, sink.receivedMessages.get(0)._2.offset) @@ -174,10 +174,10 @@ class TestBrokerProxy extends Logging { @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = { val (bp, tp, _) = getMockBrokerProxy() bp.start - bp.addTopicPartition(tp, Option("0")) + bp.addTopicPartition(tp, "0") try { - bp.addTopicPartition(tp, Option("1")) + bp.addTopicPartition(tp, "1") fail("Should have thrown an exception") } catch { case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]") @@ -256,7 +256,7 @@ class TestBrokerProxy extends Logging { } } - bp.addTopicPartition(tp, Option("earliest")) + bp.addTopicPartition(tp, "earliest") bp.start countdownLatch.await() bp.stop
