Updated Branches: refs/heads/master 5709b27ff -> 830a23e17
SAMZA-140; update next offset gauge in broker proxy when a topicpartition is removed. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/830a23e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/830a23e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/830a23e1 Branch: refs/heads/master Commit: 830a23e17efe9b432a2517edc11b4e93b0b22807 Parents: 5709b27 Author: Chris Riccomini <[email protected]> Authored: Mon Feb 10 12:02:23 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Mon Feb 10 12:02:23 2014 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/samza/system/kafka/BrokerProxy.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/830a23e1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 95a1fb5..34727e9 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -97,11 +97,13 @@ class BrokerProxy( def removeTopicPartition(tp: TopicAndPartition) = { if (nextOffsets.containsKey(tp)) { - nextOffsets.remove(tp) + val offset = nextOffsets.remove(tp) metrics.topicPartitions(host, port).set(nextOffsets.size) debug("Removed %s" format tp) + offset } else { warn("Asked to remove topic and partition %s, but not in map (keys = %s)" format (tp, nextOffsets.keys.mkString(","))) + None } } @@ -163,7 +165,7 @@ class BrokerProxy( def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = { // Need to be mindful of a tp that was removed by another thread - def abdicate(tp:TopicAndPartition) = nextOffsets.remove(tp) match { + def abdicate(tp:TopicAndPartition) = removeTopicPartition(tp) match { case Some(offset) => messageSink.abdicate(tp, offset -1) case None => warn("Tried to abdicate for topic partition not in map. Removed in interim?") }
