Repository: samza
Updated Branches:
  refs/heads/0.9.1 176b9abdb -> 72c290535


SAMZA-720: Fix BootstrapChooser hanging issue. Backport to 0.9.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/72c29053
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/72c29053
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/72c29053

Branch: refs/heads/0.9.1
Commit: 72c290535af67ffc0804524455cb08f56cc155c2
Parents: 176b9ab
Author: Yan Fang <[email protected]>
Authored: Tue Jun 23 15:43:26 2015 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Tue Jun 23 15:43:26 2015 -0700

----------------------------------------------------------------------
 .../system/chooser/BootstrappingChooser.scala     | 11 +++++++++++
 .../system/chooser/TestBootstrappingChooser.scala | 18 ++++++++++++++++++
 2 files changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/72c29053/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
 
b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index dd500b9..1cd8e06 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -91,12 +91,21 @@ class BootstrappingChooser(
     .toSet
 
   /**
+   * Store all the systemStreamPartitions registered
+   */
+  var registeredSystemStreamPartitions = Set[SystemStreamPartition]()
+
+  /**
    * The number of lagging partitions that the underlying wrapped chooser has
    * been updated with, grouped by SystemStream.
    */
   var updatedSystemStreams = Map[SystemStream, Int]()
 
   def start = {
+    // remove the systemStreamPartitions not registered.
+    laggingSystemStreamPartitions = 
laggingSystemStreamPartitions.filter(registeredSystemStreamPartitions.contains(_))
+    systemStreamLagCounts = 
laggingSystemStreamPartitions.groupBy(_.getSystemStream).map {case 
(systemStream, ssps) => systemStream -> ssps.size}
+
     debug("Starting bootstrapping chooser with bootstrap metadata: %s" format 
bootstrapStreamMetadata)
     info("Got lagging partition counts for bootstrap streams: %s" format 
systemStreamLagCounts)
     metrics.setLaggingSystemStreams(() => laggingSystemStreamPartitions.size)
@@ -118,6 +127,8 @@ class BootstrappingChooser(
     checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING)
 
     wrapped.register(systemStreamPartition, offset)
+
+    registeredSystemStreamPartitions += systemStreamPartition
   }
 
   def update(envelope: IncomingMessageEnvelope) {

http://git-wip-us.apache.org/repos/asf/samza/blob/72c29053/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
index 3c2693c..2e0180d 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -180,6 +180,24 @@ class TestBootstrappingChooser(getChooser: 
(MessageChooser, Map[SystemStream, Sy
     assertNull(chooser.choose)
     // Fin.
   }
+
+  @Test
+  def testChooserRegisteredCorrectSsps {
+    val mock = new MockMessageChooser
+    val metadata1 = getMetadata(envelope1, "123")
+    val metadata2 = getMetadata(envelope2, "321")
+    val chooser = new BootstrappingChooser(mock, 
Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, 
envelope2.getSystemStreamPartition.getSystemStream -> metadata2))
+
+    chooser.register(envelope1.getSystemStreamPartition, "1")
+    chooser.register(envelope2.getSystemStreamPartition, "1")
+    chooser.start
+
+    // it should only contain stream partition 0 and stream1 partition 1
+    val expectedLaggingSsps = Set(envelope1.getSystemStreamPartition, 
envelope2.getSystemStreamPartition)
+    assertEquals(expectedLaggingSsps, chooser.laggingSystemStreamPartitions)
+    val expectedSystemStreamLagCounts = 
Map(envelope1.getSystemStreamPartition.getSystemStream -> 1, 
envelope2.getSystemStreamPartition.getSystemStream -> 1)
+    assertEquals(expectedSystemStreamLagCounts, chooser.systemStreamLagCounts)
+  }
 }
 
 object TestBootstrappingChooser {

Reply via email to