Updated Branches:
  refs/heads/master 5709b27ff -> 830a23e17

SAMZA-140; update next offset gauge in broker proxy when a topicpartition is 
removed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/830a23e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/830a23e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/830a23e1

Branch: refs/heads/master
Commit: 830a23e17efe9b432a2517edc11b4e93b0b22807
Parents: 5709b27
Author: Chris Riccomini <[email protected]>
Authored: Mon Feb 10 12:02:23 2014 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Mon Feb 10 12:02:23 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/system/kafka/BrokerProxy.scala | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/830a23e1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 95a1fb5..34727e9 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -97,11 +97,13 @@ class BrokerProxy(
 
   def removeTopicPartition(tp: TopicAndPartition) = {
     if (nextOffsets.containsKey(tp)) {
-      nextOffsets.remove(tp)
+      val offset = nextOffsets.remove(tp)
       metrics.topicPartitions(host, port).set(nextOffsets.size)
       debug("Removed %s" format tp)
+      offset
     } else {
       warn("Asked to remove topic and partition %s, but not in map (keys = 
%s)" format (tp, nextOffsets.keys.mkString(",")))
+      None
     }
   }
 
@@ -163,7 +165,7 @@ class BrokerProxy(
 
   def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, 
FetchResponsePartitionData]], response:FetchResponse) = {
     // Need to be mindful of a tp that was removed by another thread
-    def abdicate(tp:TopicAndPartition) = nextOffsets.remove(tp) match {
+    def abdicate(tp:TopicAndPartition) = removeTopicPartition(tp) match {
         case Some(offset) => messageSink.abdicate(tp, offset -1)
         case None         => warn("Tried to abdicate for topic partition not 
in map. Removed in interim?")
       }

Reply via email to