Repository: samza Updated Branches: refs/heads/0.9.1 176b9abdb -> 72c290535
SAMZA-720: Fix BootstrapChooser hanging issue. Backport to 0.9.1 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/72c29053 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/72c29053 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/72c29053 Branch: refs/heads/0.9.1 Commit: 72c290535af67ffc0804524455cb08f56cc155c2 Parents: 176b9ab Author: Yan Fang <[email protected]> Authored: Tue Jun 23 15:43:26 2015 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Tue Jun 23 15:43:26 2015 -0700 ---------------------------------------------------------------------- .../system/chooser/BootstrappingChooser.scala | 11 +++++++++++ .../system/chooser/TestBootstrappingChooser.scala | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/72c29053/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala index dd500b9..1cd8e06 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala @@ -91,12 +91,21 @@ class BootstrappingChooser( .toSet /** + * Store all the systemStreamPartitions registered + */ + var registeredSystemStreamPartitions = Set[SystemStreamPartition]() + + /** * The number of lagging partitions that the underlying wrapped chooser has * been updated with, grouped by SystemStream. */ var updatedSystemStreams = Map[SystemStream, Int]() def start = { + // remove the systemStreamPartitions not registered. + laggingSystemStreamPartitions = laggingSystemStreamPartitions.filter(registeredSystemStreamPartitions.contains(_)) + systemStreamLagCounts = laggingSystemStreamPartitions.groupBy(_.getSystemStream).map {case (systemStream, ssps) => systemStream -> ssps.size} + debug("Starting bootstrapping chooser with bootstrap metadata: %s" format bootstrapStreamMetadata) info("Got lagging partition counts for bootstrap streams: %s" format systemStreamLagCounts) metrics.setLaggingSystemStreams(() => laggingSystemStreamPartitions.size) @@ -118,6 +127,8 @@ class BootstrappingChooser( checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING) wrapped.register(systemStreamPartition, offset) + + registeredSystemStreamPartitions += systemStreamPartition } def update(envelope: IncomingMessageEnvelope) { http://git-wip-us.apache.org/repos/asf/samza/blob/72c29053/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala index 3c2693c..2e0180d 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala @@ -180,6 +180,24 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy assertNull(chooser.choose) // Fin. } + + @Test + def testChooserRegisteredCorrectSsps { + val mock = new MockMessageChooser + val metadata1 = getMetadata(envelope1, "123") + val metadata2 = getMetadata(envelope2, "321") + val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2)) + + chooser.register(envelope1.getSystemStreamPartition, "1") + chooser.register(envelope2.getSystemStreamPartition, "1") + chooser.start + + // it should only contain stream partition 0 and stream1 partition 1 + val expectedLaggingSsps = Set(envelope1.getSystemStreamPartition, envelope2.getSystemStreamPartition) + assertEquals(expectedLaggingSsps, chooser.laggingSystemStreamPartitions) + val expectedSystemStreamLagCounts = Map(envelope1.getSystemStreamPartition.getSystemStream -> 1, envelope2.getSystemStreamPartition.getSystemStream -> 1) + assertEquals(expectedSystemStreamLagCounts, chooser.systemStreamLagCounts) + } } object TestBootstrappingChooser {
