Repository: samza Updated Branches: refs/heads/master 9f323c950 -> 4187ba953
SAMZA-1728: BootstrappingChooser - Call checkOffset only for a lagging partition while choosing. Author: Aditya Toomula <[email protected]> Reviewers: Jagadish<[email protected]> Closes #533 from atoomula/chooser Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4187ba95 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4187ba95 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4187ba95 Branch: refs/heads/master Commit: 4187ba9535da5c88a9e22754927da58600b48447 Parents: 9f323c9 Author: Aditya Toomula <[email protected]> Authored: Tue May 29 18:48:02 2018 -0700 Committer: Jagadish <[email protected]> Committed: Tue May 29 18:48:02 2018 -0700 ---------------------------------------------------------------------- .../system/chooser/BootstrappingChooser.scala | 10 ++-- .../chooser/TestBootstrappingChooser.scala | 62 ++++++++++++++++++-- 2 files changed, 63 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/4187ba95/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala index 38e2cfa..f50f27d 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala @@ -196,12 +196,12 @@ class BootstrappingChooser( val systemStream = systemStreamPartition.getSystemStream updatedSystemStreams += systemStream -> (updatedSystemStreams.getOrElse(systemStream, 0) - 1) - } - // If the offset we just read is the same as the offset for the last - // message (newest) in this system stream partition, then we have read - // all messages, and can mark this SSP as bootstrapped. - checkOffset(systemStreamPartition, offset, OffsetType.NEWEST) + // If the offset we just read is the same as the offset for the last + // message (newest) in this system stream partition, then we have read + // all messages, and can mark this SSP as bootstrapped. + checkOffset(systemStreamPartition, offset, OffsetType.NEWEST) + } } envelope http://git-wip-us.apache.org/repos/asf/samza/blob/4187ba95/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala index a017518..1a99355 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala @@ -37,10 +37,14 @@ import scala.collection.JavaConverters._ @RunWith(value = classOf[Parameterized]) class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) { - val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "120", null, 1); - val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "121", null, 2); - val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "122", null, 3); - val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4); + val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "120", null, 1) + val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "121", null, 2) + val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "122", null, 3) + val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4) + val envelope5 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "124", null, 5) + val envelope6 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "125", null, 6) + val envelope7 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "124", null, 7) + val envelope8 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "125", null, 8) /** * Helper function to create metadata for a single envelope with a single offset. @@ -202,6 +206,56 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy } @Test + def testChooserShouldHaveNoLaggingSspsAfterCaughtUp { + val mockMessageChooser = new MockMessageChooser + val sspMetadataMap = + Map(envelope3.getSystemStreamPartition.getPartition -> new SystemStreamPartitionMetadata(null, "123", null), + envelope2.getSystemStreamPartition.getPartition -> new SystemStreamPartitionMetadata(null, "123", null)) + val metadata = new SystemStreamMetadata( + envelope3.getSystemStreamPartition.getStream, + sspMetadataMap.asJava) + val systemAdmins = mock(classOf[SystemAdmins]) + when(systemAdmins.getSystemAdmin("kafka")).thenReturn(new MockSystemAdmin) + val chooser = new BootstrappingChooser(mockMessageChooser, Map(envelope2.getSystemStreamPartition.getSystemStream -> metadata), + new BootstrappingChooserMetrics(), systemAdmins) + + chooser.register(envelope2.getSystemStreamPartition, "1") + chooser.register(envelope3.getSystemStreamPartition, "1") + chooser.start + + // There should be 2 lagging partitions + assertEquals(Map(envelope2.getSystemStreamPartition.getSystemStream -> 2), chooser.systemStreamLagCounts) + + assertNull(chooser.choose) + chooser.update(envelope5) // ssp1 is now marked as not lagging + assertEquals(envelope5, chooser.choose) + + // There should be 1 lagging partition + assertEquals(Map(envelope2.getSystemStreamPartition.getSystemStream -> 1), chooser.systemStreamLagCounts) + + // Update with one more envelope from ssp1 and make sure that systemStreamLagCounts is still 1 + chooser.update(envelope6) + assertEquals(null, chooser.choose) // no events are expected to be chosen from ssp1 until lagging ssp0 has envelopes + + chooser.update(envelope3) + assertEquals(envelope6, chooser.choose) + assertEquals(envelope3, chooser.choose) + + // There should still be 1 lagging partition + assertEquals(Map(envelope2.getSystemStreamPartition.getSystemStream -> 1), chooser.systemStreamLagCounts) + + chooser.update(envelope7) + assertEquals(envelope7, chooser.choose) // ssp0 is now marked as not lagging + + // chooser should not have any lagging partitions + assertTrue(chooser.laggingSystemStreamPartitions.isEmpty) + assertTrue(chooser.systemStreamLagCounts.isEmpty) + + chooser.update(envelope8) + assertEquals(envelope8, chooser.choose) + } + + @Test def testChooserRegisterWithStreamUsedAsBootstrapAndBroadcast: Unit = { val mockMessageChooser = new MockMessageChooser val metadata1 = getMetadata(envelope1, "123")
