Updated Branches: refs/heads/master bb0abb670 -> 7bbde2cc2
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala new file mode 100644 index 0000000..8f5fb66 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.chooser + +import org.junit.Assert._ +import org.junit.Test +import org.apache.samza.system.IncomingMessageEnvelope +import scala.collection.immutable.Queue +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.Partition +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.runners.Parameterized.Parameters +import java.util.Arrays + +@RunWith(value = classOf[Parameterized]) +class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStreamPartition, String]) => MessageChooser) { + val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1); + val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2); + val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 3); + val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4); + + @Test + def testChooserShouldIgnoreStreamsThatArentInOffsetMap { + val mock = new MockMessageChooser + val chooser = getChooser(mock, Map()) + + chooser.register(envelope1.getSystemStreamPartition, "foo") + chooser.start + assertEquals(1, mock.starts) + assertEquals("foo", mock.registers(envelope1.getSystemStreamPartition)) + chooser.update(envelope1) + assertEquals(envelope1, chooser.choose) + assertEquals(null, chooser.choose) + chooser.stop + assertEquals(1, mock.stops) + } + + @Test + def testChooserShouldEliminateCaughtUpStreamsOnRegister { + val mock = new MockMessageChooser + val chooser = getChooser(mock, Map(envelope1.getSystemStreamPartition -> "123")) + + // Even though envelope1's SSP is registered as a bootstrap stream, since + // 123=123, it should be marked as "caught up" and treated like a normal + // stream. This means that non-bootstrap stream envelope should be allowed + // to be chosen. + chooser.register(envelope1.getSystemStreamPartition, "123") + chooser.register(envelope2.getSystemStreamPartition, "321") + chooser.start + chooser.update(envelope2) + assertEquals(envelope2, chooser.choose) + assertEquals(null, chooser.choose) + } + + @Test + def testChooserShouldEliminateCaughtUpStreamsAfterRegister { + val mock = new MockMessageChooser + val chooser = getChooser(mock, Map(envelope1.getSystemStreamPartition -> "123")) + + // Even though envelope1's SSP is registered as a bootstrap stream, since + // 123=123, it should be marked as "caught up" and treated like a normal + // stream. This means that non-bootstrap stream envelope should be allowed + // to be chosen. + chooser.register(envelope1.getSystemStreamPartition, "1") + chooser.register(envelope2.getSystemStreamPartition, null) + chooser.start + chooser.update(envelope2) + // Choose should not return anything since bootstrapper is blocking + // wrapped.choose until it gets an update from envelope1's SSP. + assertEquals(null, chooser.choose) + chooser.update(envelope1) + // Now that we have an update from the required SSP, the mock chooser + // should be called, and return. + assertEquals(envelope2, chooser.choose) + // The chooser still has an envelope from envelope1's SSP, so it should + // return. + assertEquals(envelope1, chooser.choose) + // No envelope for envelope1's SSP has been given, so it should block. + chooser.update(envelope2) + assertEquals(null, chooser.choose) + // Now we're giving an envelope with the proper last offset (123), so no + // envelope1's SSP should be treated no differently than envelope2's. + chooser.update(envelope4) + assertEquals(envelope2, chooser.choose) + assertEquals(envelope4, chooser.choose) + assertEquals(null, chooser.choose) + // Should not block here since there are no more lagging bootstrap streams. + chooser.update(envelope2) + assertEquals(envelope2, chooser.choose) + assertEquals(null, chooser.choose) + chooser.update(envelope2) + assertEquals(envelope2, chooser.choose) + assertEquals(null, chooser.choose) + } + + @Test + def testChooserShouldWorkWithTwoBootstrapStreams { + val mock = new MockMessageChooser + val chooser = getChooser(mock, Map( + envelope1.getSystemStreamPartition -> "123", + envelope2.getSystemStreamPartition -> "321")) + + chooser.register(envelope1.getSystemStreamPartition, "1") + chooser.register(envelope2.getSystemStreamPartition, "1") + chooser.register(envelope3.getSystemStreamPartition, "1") + chooser.start + chooser.update(envelope1) + assertEquals(null, chooser.choose) + chooser.update(envelope3) + assertEquals(null, chooser.choose) + chooser.update(envelope2) + + // Fully loaded now. + assertEquals(envelope1, chooser.choose) + // Can't pick again because envelope1's SSP is missing. + assertEquals(null, chooser.choose) + chooser.update(envelope1) + // Can pick again. + assertEquals(envelope3, chooser.choose) + // Can still pick since envelope3.SSP isn't being tracked. + assertEquals(envelope2, chooser.choose) + // Can't pick since envelope2.SSP needs an envelope now. + assertEquals(null, chooser.choose) + chooser.update(envelope2) + // Now we get envelope1 again. + assertEquals(envelope1, chooser.choose) + // Can't pick again. + assertEquals(null, chooser.choose) + // Now use envelope4, to trigger "all caught up" for envelope1.SSP. + chooser.update(envelope4) + // Chooser's contents is currently: e2, e4 (System.err.println(mock.getEnvelopes)) + // Add envelope3, whose SSP isn't being tracked. + chooser.update(envelope3) + assertEquals(envelope2, chooser.choose) + assertEquals(null, chooser.choose) + chooser.update(envelope2) + // Chooser's contents is currently: e4, e3, e2 (System.err.println(mock.getEnvelopes)) + assertEquals(envelope4, chooser.choose) + // This should be allowed, even though no message from envelope1.SSP is + // available, since envelope4 triggered "all caught up" because its offset + // matches the offset map for this SSP, and we still have an envelope for + // envelope2.SSP in the queue. + assertEquals(envelope3, chooser.choose) + assertEquals(envelope2, chooser.choose) + assertEquals(null, chooser.choose) + // Fin. + } +} + +object TestBootstrappingChooser { + // Test both BatchingChooser and DefaultChooser here. DefaultChooser with + // just batch size defined should behave just like plain vanilla batching + // chooser. + @Parameters + def parameters: java.util.Collection[Array[(MessageChooser, Map[SystemStreamPartition, String]) => MessageChooser]] = Arrays.asList( + Array((wrapped: MessageChooser, latestMessageOffsets: Map[SystemStreamPartition, String]) => new BootstrappingChooser(wrapped, latestMessageOffsets)), + Array((wrapped: MessageChooser, latestMessageOffsets: Map[SystemStreamPartition, String]) => new DefaultChooser(wrapped, bootstrapStreamOffsets = latestMessageOffsets))) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala new file mode 100644 index 0000000..12e12c0 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.chooser + +import org.junit.Assert._ +import org.junit.Test +import org.apache.samza.util.BlockingEnvelopeMap +import org.apache.samza.system.IncomingMessageEnvelope +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.Partition + +class TestDefaultChooser { + val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1); + val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2); + val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream2", new Partition(0)), null, null, 3); + val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4); + val envelope5 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), null, null, 5); + val envelope6 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), "321", null, 6); + val envelope7 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 7); + + @Test + def testDefaultChooserWithBatchingPrioritizationAndBootstrapping { + val mock0 = new MockMessageChooser + val mock1 = new MockMessageChooser + val mock2 = new MockMessageChooser + val chooser = new DefaultChooser( + mock0, + Some(2), + Map( + envelope1.getSystemStreamPartition().getSystemStream -> Int.MaxValue, + envelope2.getSystemStreamPartition().getSystemStream -> 1), + Map( + Int.MaxValue -> mock1, + 1 -> mock2), + Map( + envelope1.getSystemStreamPartition() -> "123", + envelope5.getSystemStreamPartition() -> "321")) + + chooser.register(envelope1.getSystemStreamPartition, null) + chooser.register(envelope2.getSystemStreamPartition, null) + chooser.register(envelope3.getSystemStreamPartition, null) + chooser.register(envelope5.getSystemStreamPartition, null) + chooser.start + assertEquals(null, chooser.choose) + + // Load with a non-bootstrap stream, and should still get null. + chooser.update(envelope3) + assertEquals(null, chooser.choose) + + // Load with a bootstrap stream, should get that envelope. + chooser.update(envelope1) + assertEquals(envelope1, chooser.choose) + + // Should block envelope3 since we have no message from envelope1's bootstrap stream. + assertEquals(null, chooser.choose) + + // Load envelope2 from non-bootstrap stream with higher priority than envelope3. + chooser.update(envelope2) + + // Should block envelope2 since we have no message from envelope1's bootstrap stream. + assertEquals(null, chooser.choose) + + // Test batching by giving chooser envelope1 and envelope5, both from same stream, but envelope1 should be preferred partition. + chooser.update(envelope5) + chooser.update(envelope1) + assertEquals(envelope1, chooser.choose) + + // Now, envelope5 is still loaded, and we've reached our batchSize limit, so loading envelope1 should still let envelope5 through. + chooser.update(envelope1) + assertEquals(envelope5, chooser.choose) + assertEquals(envelope1, chooser.choose) + assertEquals(null, chooser.choose) + + // Now we're back to just envelope3, envelope2. Let's catch up envelope1's SSP using envelope4's offset. + chooser.update(envelope4) + assertEquals(envelope4, chooser.choose) + + // Should still block envelopes 1 and 2 because the second partition hasn't caught up yet. + assertEquals(null, chooser.choose) + + // Now catch up the second partition. + chooser.update(envelope6) + assertEquals(envelope6, chooser.choose) + + // Cool, now no streams are being bootstrapped. Envelope2 should be prioritized above envelope3, even though envelope3 was added first. + assertEquals(envelope2, chooser.choose) + + // We should still batch, and prefer envelope2's partition over envelope7, even though they're both from the same stream. + chooser.update(envelope7) + chooser.update(envelope2) + assertEquals(envelope2, chooser.choose) + + // Now envelope2's partition has passed the batchSize, so we should get 7 next. + chooser.update(envelope2) + assertEquals(envelope7, chooser.choose) + assertEquals(envelope2, chooser.choose) + + // Now we should finally get the lowest priority non-bootstrap stream, envelope3. + assertEquals(envelope3, chooser.choose) + assertEquals(null, chooser.choose) + } +} + +class MockBlockingEnvelopeMap extends BlockingEnvelopeMap { + def start = Unit + def stop = Unit +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala new file mode 100644 index 0000000..01802b9 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.chooser + +import org.junit.Assert._ +import org.junit.Test +import org.apache.samza.Partition +import org.apache.samza.system.IncomingMessageEnvelope +import org.apache.samza.system.SystemStreamPartition +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.runners.Parameterized.Parameters +import java.util.Arrays + +@RunWith(value = classOf[Parameterized]) +class TestRoundRobinChooser(getChooser: () => MessageChooser) { + @Test + def testRoundRobinChooser { + val chooser = getChooser() + val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1); + val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), null, null, 2); + val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 3); + + chooser.register(envelope1.getSystemStreamPartition, null) + chooser.register(envelope2.getSystemStreamPartition, "") + chooser.register(envelope3.getSystemStreamPartition, "123") + chooser.start + + assertEquals(null, chooser.choose) + + // Test one message. + chooser.update(envelope1) + assertEquals(envelope1, chooser.choose) + assertEquals(null, chooser.choose) + + // Verify simple ordering. + chooser.update(envelope1) + chooser.update(envelope2) + chooser.update(envelope3) + + assertEquals(envelope1, chooser.choose) + assertEquals(envelope2, chooser.choose) + assertEquals(envelope3, chooser.choose) + assertEquals(null, chooser.choose) + + // Verify mixed ordering. + chooser.update(envelope2) + chooser.update(envelope1) + + assertEquals(envelope2, chooser.choose) + assertEquals(envelope1, chooser.choose) + + chooser.update(envelope1) + chooser.update(envelope2) + + assertEquals(envelope1, chooser.choose) + assertEquals(envelope2, chooser.choose) + assertEquals(null, chooser.choose) + + // Verify simple ordering with different starting envelope. + chooser.update(envelope2) + chooser.update(envelope1) + chooser.update(envelope3) + + assertEquals(envelope2, chooser.choose) + assertEquals(envelope1, chooser.choose) + assertEquals(envelope3, chooser.choose) + assertEquals(null, chooser.choose) + } +} + +object TestRoundRobinChooser { + // Test both RoundRobinChooser and DefaultChooser here. DefaultChooser with + // no batching, prioritization, or bootstrapping should default to just a + // plain vanilla round robin chooser. + @Parameters + def parameters: java.util.Collection[Array[() => MessageChooser]] = Arrays.asList(Array(() => new RoundRobinChooser), Array(() => new DefaultChooser)) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala new file mode 100644 index 0000000..4cde630 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.chooser + +import org.junit.Assert._ +import org.junit.Test +import org.apache.samza.system.IncomingMessageEnvelope +import scala.collection.immutable.Queue +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.Partition +import org.apache.samza.SamzaException +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.runners.Parameterized.Parameters +import java.util.Arrays +import org.apache.samza.system.SystemStream + +@RunWith(value = classOf[Parameterized]) +class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, MessageChooser], MessageChooser) => MessageChooser) { + val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1); + val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2); + val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 3); + val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4); + + @Test + def testChooserShouldStartStopAndRegister { + val mock0 = new MockMessageChooser + val mock1 = new MockMessageChooser + val chooser = getChooser( + Map(envelope1.getSystemStreamPartition -> 1), + Map(1 -> mock1), + mock0) + + chooser.register(envelope1.getSystemStreamPartition, "foo") + chooser.start + assertEquals("foo", mock1.registers(envelope1.getSystemStreamPartition)) + assertEquals(1, mock0.starts) + assertEquals(1, mock1.starts) + chooser.stop + assertEquals(1, mock0.stops) + assertEquals(1, mock1.stops) + } + + @Test + def testChooserShouldFallBackToDefault { + val mock = new MockMessageChooser + val chooser = getChooser( + Map(), + Map(), + mock) + + chooser.register(envelope1.getSystemStreamPartition, null) + chooser.start + assertEquals(null, chooser.choose) + chooser.update(envelope1) + assertEquals(envelope1, chooser.choose) + assertEquals(null, chooser.choose) + } + + @Test + def testChooserShouldFailWithNoDefault { + val mock = new MockMessageChooser + val chooser = getChooser( + Map(envelope1.getSystemStreamPartition.getSystemStream -> 0), + Map(0 -> mock), + null) + + // The SSP for envelope2 is not defined as a priority stream. + chooser.register(envelope2.getSystemStreamPartition, null) + chooser.start + assertEquals(null, chooser.choose) + + try { + chooser.update(envelope2) + fail("Should have failed due to missing default chooser.") + } catch { + case e: SamzaException => // Expected. + } + } + + @Test + def testChooserWithSingleStream { + val mock = new MockMessageChooser + val chooser = getChooser( + Map(envelope1.getSystemStreamPartition.getSystemStream -> 0), + Map(0 -> mock), + new MockMessageChooser) + + chooser.register(envelope1.getSystemStreamPartition, null) + chooser.start + + assertEquals(null, chooser.choose) + chooser.update(envelope1) + chooser.update(envelope4) + assertEquals(envelope1, chooser.choose) + assertEquals(envelope4, chooser.choose) + assertEquals(null, chooser.choose) + + chooser.update(envelope4) + chooser.update(envelope1) + assertEquals(envelope4, chooser.choose) + assertEquals(envelope1, chooser.choose) + assertEquals(null, chooser.choose) + } + + @Test + def testChooserWithSingleStreamWithTwoPartitions { + val mock = new MockMessageChooser + val chooser = getChooser( + Map(envelope2.getSystemStreamPartition.getSystemStream -> 0), + Map(0 -> mock), + new MockMessageChooser) + + chooser.register(envelope2.getSystemStreamPartition, null) + chooser.register(envelope3.getSystemStreamPartition, null) + chooser.start + + assertEquals(null, chooser.choose) + chooser.update(envelope2) + chooser.update(envelope3) + assertEquals(envelope2, chooser.choose) + assertEquals(envelope3, chooser.choose) + assertEquals(null, chooser.choose) + + chooser.update(envelope3) + chooser.update(envelope2) + assertEquals(envelope3, chooser.choose) + assertEquals(envelope2, chooser.choose) + assertEquals(null, chooser.choose) + } + + @Test + def testChooserWithTwoStreamsOfEqualPriority { + val mock = new MockMessageChooser + val chooser = getChooser( + Map( + envelope1.getSystemStreamPartition.getSystemStream -> 0, + envelope2.getSystemStreamPartition.getSystemStream -> 0), + Map(0 -> mock), + new MockMessageChooser) + + chooser.register(envelope1.getSystemStreamPartition, null) + chooser.register(envelope2.getSystemStreamPartition, null) + chooser.start + + assertEquals(null, chooser.choose) + chooser.update(envelope1) + chooser.update(envelope4) + assertEquals(envelope1, chooser.choose) + assertEquals(envelope4, chooser.choose) + assertEquals(null, chooser.choose) + + chooser.update(envelope4) + chooser.update(envelope1) + assertEquals(envelope4, chooser.choose) + assertEquals(envelope1, chooser.choose) + assertEquals(null, chooser.choose) + + chooser.update(envelope2) + chooser.update(envelope4) + assertEquals(envelope2, chooser.choose) + assertEquals(envelope4, chooser.choose) + assertEquals(null, chooser.choose) + + chooser.update(envelope1) + chooser.update(envelope2) + assertEquals(envelope1, chooser.choose) + assertEquals(envelope2, chooser.choose) + assertEquals(null, chooser.choose) + } + + @Test + def testChooserWithTwoStreamsOfDifferentPriority { + val mock0 = new MockMessageChooser + val mock1 = new MockMessageChooser + val chooser = getChooser( + Map( + envelope1.getSystemStreamPartition.getSystemStream -> 1, + envelope2.getSystemStreamPartition.getSystemStream -> 0), + Map( + 0 -> mock0, + 1 -> mock1), + new MockMessageChooser) + + chooser.register(envelope1.getSystemStreamPartition, null) + chooser.register(envelope2.getSystemStreamPartition, null) + chooser.start + + assertEquals(null, chooser.choose) + chooser.update(envelope1) + chooser.update(envelope4) + assertEquals(envelope1, chooser.choose) + assertEquals(envelope4, chooser.choose) + assertEquals(null, chooser.choose) + + chooser.update(envelope4) + chooser.update(envelope1) + assertEquals(envelope4, chooser.choose) + assertEquals(envelope1, chooser.choose) + assertEquals(null, chooser.choose) + + chooser.update(envelope2) + chooser.update(envelope4) + // Reversed here because envelope4.SSP=envelope1.SSP which is higher + // priority. + assertEquals(envelope4, chooser.choose) + assertEquals(envelope2, chooser.choose) + assertEquals(null, chooser.choose) + + chooser.update(envelope1) + chooser.update(envelope2) + assertEquals(envelope1, chooser.choose) + assertEquals(envelope2, chooser.choose) + assertEquals(null, chooser.choose) + + // Just the low priority stream. + chooser.update(envelope2) + assertEquals(envelope2, chooser.choose) + assertEquals(null, chooser.choose) + } +} + +object TestTieredPriorityChooser { + // Test both PriorityChooser and DefaultChooser here. DefaultChooser with + // just priorities defined should behave just like plain vanilla priority + // chooser. + @Parameters + def parameters: java.util.Collection[Array[(Map[SystemStream, Int], Map[Int, MessageChooser], MessageChooser) => MessageChooser]] = Arrays.asList( + Array((priorities: Map[SystemStream, Int], choosers: Map[Int, MessageChooser], default: MessageChooser) => new TieredPriorityChooser(priorities, choosers, default)), + Array((priorities: Map[SystemStream, Int], choosers: Map[Int, MessageChooser], default: MessageChooser) => new DefaultChooser(default, None, priorities, choosers))) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 183c6cc..c5487b9 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -25,19 +25,36 @@ import org.apache.samza.util.ClientUtilTopicMetadataStore import kafka.api.TopicMetadata import scala.collection.JavaConversions._ import org.apache.samza.system.SystemAdmin +import org.apache.samza.SamzaException +import kafka.consumer.SimpleConsumer +import kafka.utils.Utils +import kafka.client.ClientUtils +import java.util.Random +import kafka.api.TopicMetadataRequest +import kafka.common.TopicAndPartition +import kafka.api.PartitionOffsetRequestInfo +import kafka.api.OffsetRequest +import kafka.api.FetchRequestBuilder +import org.apache.samza.system.SystemStreamPartition +import kafka.common.ErrorMapping +import grizzled.slf4j.Logging class KafkaSystemAdmin( systemName: String, // TODO whenever Kafka decides to make the Set[Broker] class public, let's switch to Set[Broker] here. brokerListString: String, - clientId: String = UUID.randomUUID.toString) extends SystemAdmin { + timeout: Int = Int.MaxValue, + bufferSize: Int = 1024000, + clientId: String = UUID.randomUUID.toString) extends SystemAdmin with Logging { - def getPartitions(streamName: String): java.util.Set[Partition] = { - val getTopicMetadata = (topics: Set[String]) => { - new ClientUtilTopicMetadataStore(brokerListString, clientId) - .getTopicInfo(topics) - } + val rand = new Random + + private def getTopicMetadata(topics: Set[String]) = { + new ClientUtilTopicMetadataStore(brokerListString, clientId) + .getTopicInfo(topics) + } + def getPartitions(streamName: String): java.util.Set[Partition] = { val metadata = TopicMetadataCache.getTopicMetadata( Set(streamName), systemName, @@ -48,4 +65,93 @@ class KafkaSystemAdmin( .map(pm => new Partition(pm.partitionId)) .toSet[Partition] } + + def getLastOffsets(streams: java.util.Set[String]) = { + var offsets = Map[SystemStreamPartition, String]() + var done = false + var consumer: SimpleConsumer = null + + debug("Fetching offsets for: %s" format streams) + + while (!done) { + try { + val metadata = TopicMetadataCache.getTopicMetadata( + streams.toSet, + systemName, + getTopicMetadata) + + debug("Got metadata for streams: %s" format metadata) + + // Break topic metadata topic/partitions into per-broker map. + val brokersToTopicPartitions = metadata + .values + // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)] + .flatMap(topicMetadata => topicMetadata + .partitionsMetadata + // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)] + .map(partitionMetadata => { + ErrorMapping.maybeThrowException(partitionMetadata.errorCode) + val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId) + val leader = partitionMetadata + .leader + .getOrElse(throw new SamzaException("Need leaders for all partitions when fetching offsets. No leader available for TopicAndPartition: %s" format topicAndPartition)) + (leader, topicAndPartition) + })) + // Convert to a Map[Broker, Seq[(Broker, TopicAndPartition)]] + .groupBy(_._1) + // Convert to a Map[Broker, Seq[TopicAndPartition]] + .mapValues(_.map(_._2)) + + debug("Got topic partition data for brokers: %s" format brokersToTopicPartitions) + + // Get the latest offsets for each topic and partition. + for ((broker, topicsAndPartitions) <- brokersToTopicPartitions) { + val partitionOffsetInfo = topicsAndPartitions + .map(topicAndPartition => (topicAndPartition, PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + .toMap + consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId) + val brokerOffsets = consumer + .getOffsetsBefore(new OffsetRequest(partitionOffsetInfo)) + .partitionErrorAndOffsets + .map(partitionAndOffset => { + if (partitionAndOffset._2.offsets.head <= 0) { + debug("Filtering out empty topic partition: %s" format partitionAndOffset) + } + + partitionAndOffset + }) + .filter(_._2.offsets.head > 0) + // Kafka returns 1 greater than the offset of the last message in + // the topic, so subtract one to fetch the last message. + .mapValues(_.offsets.head - 1) + + debug("Got offsets: %s" format brokerOffsets) + debug("Shutting down consumer for %s:%s." format (broker.host, broker.port)) + + consumer.close + + for ((topicAndPartition, offset) <- brokerOffsets) { + offsets += new SystemStreamPartition(systemName, topicAndPartition.topic, new Partition(topicAndPartition.partition)) -> offset.toString + } + } + + done = true + } catch { + case e: InterruptedException => + info("Interrupted while fetching last offsets, so forwarding.") + if (consumer != null) { + consumer.close + } + throw e + case e: Exception => + // Retry. + warn("Unable to fetch last offsets for streams due to: %s, %s. Retrying. Turn on debugging to get a full stack trace." format (e.getMessage, streams)) + debug(e) + } + } + + info("Got latest offsets for streams: %s, %s" format (streams, offsets)) + + offsets + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index ba08af8..a11a72a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -103,7 +103,15 @@ class KafkaSystemFactory extends SystemFactory { val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId) val brokerListString = Option(producerConfig.brokerList) .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName)) + val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) + val timeout = consumerConfig.socketTimeoutMs + val bufferSize = consumerConfig.socketReceiveBufferBytes - new KafkaSystemAdmin(systemName, brokerListString, clientId) + new KafkaSystemAdmin( + systemName, + brokerListString, + timeout, + bufferSize, + clientId) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala index baf4695..cad2231 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala @@ -105,6 +105,7 @@ object TestKafkaCheckpointManager { Utils.rm(server1.config.logDirs) Utils.rm(server2.config.logDirs) Utils.rm(server3.config.logDirs) + zookeeper.shutdown } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala new file mode 100644 index 0000000..f0c6f8a --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -0,0 +1,212 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.system.kafka + +import org.junit.Assert._ +import org.junit.Test +import kafka.zk.EmbeddedZookeeper +import org.apache.samza.checkpoint.Checkpoint +import org.junit.BeforeClass +import org.junit.AfterClass +import org.apache.samza.util.ClientUtilTopicMetadataStore +import org.I0Itec.zkclient.ZkClient +import kafka.admin.AdminUtils +import org.apache.samza.util.TopicMetadataStore +import kafka.producer.ProducerConfig +import kafka.utils.TestUtils +import kafka.common.ErrorMapping +import kafka.utils.TestZKUtils +import kafka.server.KafkaServer +import kafka.producer.Producer +import kafka.server.KafkaConfig +import kafka.utils.Utils +import org.apache.samza.system.SystemStream +import kafka.utils.ZKStringSerializer +import scala.collection.JavaConversions._ +import kafka.producer.KeyedMessage +import kafka.message.MessageAndMetadata +import scala.collection.mutable.ArrayBuffer +import kafka.consumer.Consumer +import kafka.consumer.ConsumerConfig +import java.util.Properties +import com.sun.xml.internal.xsom.impl.parser.state.group + +object TestKafkaSystemAdmin { + val TOPIC = "input" + val TOTAL_PARTITIONS = 50 + val REPLICATION_FACTOR = 2 + + val zkConnect: String = TestZKUtils.zookeeperConnect + var zkClient: ZkClient = null + val zkConnectionTimeout = 6000 + val zkSessionTimeout = 6000 + val brokerId1 = 0 + val brokerId2 = 1 + val brokerId3 = 2 + val ports = TestUtils.choosePorts(3) + val (port1, port2, port3) = (ports(0), ports(1), ports(2)) + + val props1 = TestUtils.createBrokerConfig(brokerId1, port1) + val config1 = new KafkaConfig(props1) { + override val hostName = "localhost" + override val numPartitions = 1 + override val zkConnect = TestKafkaSystemAdmin.zkConnect + "/" + } + val props2 = TestUtils.createBrokerConfig(brokerId2, port2) + val config2 = new KafkaConfig(props2) { + override val hostName = "localhost" + override val numPartitions = 1 + override val zkConnect = TestKafkaSystemAdmin.zkConnect + "/" + } + val props3 = TestUtils.createBrokerConfig(brokerId3, port3) + val config3 = new KafkaConfig(props3) { + override val hostName = "localhost" + override val numPartitions = 1 + override val zkConnect = TestKafkaSystemAdmin.zkConnect + "/" + } + + val config = new java.util.Properties() + val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3) + config.put("metadata.broker.list", brokers) + config.put("producer.type", "sync") + config.put("request.required.acks", "-1") + config.put("serializer.class", "kafka.serializer.StringEncoder"); + val producerConfig = new ProducerConfig(config) + var producer: Producer[String, String] = null + var zookeeper: EmbeddedZookeeper = null + var server1: KafkaServer = null + var server2: KafkaServer = null + var server3: KafkaServer = null + var metadataStore: TopicMetadataStore = null + + @BeforeClass + def beforeSetupServers { + zookeeper = new EmbeddedZookeeper(zkConnect) + server1 = TestUtils.createServer(config1) + server2 = TestUtils.createServer(config2) + server3 = TestUtils.createServer(config3) + zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer) + producer = new Producer(producerConfig) + metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name") + } + + def createTopic { + AdminUtils.createTopic( + zkClient, + TOPIC, + TOTAL_PARTITIONS, + REPLICATION_FACTOR) + } + + def validateTopic { + var done = false + var retries = 0 + val maxRetries = 100 + + while (!done && retries < maxRetries) { + try { + val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(TOPIC), "kafka", metadataStore.getTopicInfo) + val topicMetadata = topicMetadataMap(TOPIC) + val errorCode = topicMetadata.errorCode + + ErrorMapping.maybeThrowException(errorCode) + + done = true + } catch { + case e: Throwable => + System.err.println("Got exception while validating test topics. Waiting and retrying.", e) + retries += 1 + Thread.sleep(500) + } + } + + if (retries >= maxRetries) { + fail("Unable to successfully create topics. Tried to validate %s times." format retries) + } + } + + def getConsumerConnector = { + val props = new Properties + + props.put("zookeeper.connect", zkConnect) + props.put("group.id", "test") + props.put("auto.offset.reset", "smallest") + + val consumerConfig = new ConsumerConfig(props) + Consumer.create(consumerConfig) + } + + @AfterClass + def afterCleanLogDirs { + server1.shutdown + server1.awaitShutdown() + server2.shutdown + server2.awaitShutdown() + server3.shutdown + server3.awaitShutdown() + Utils.rm(server1.config.logDirs) + Utils.rm(server2.config.logDirs) + Utils.rm(server3.config.logDirs) + zkClient.close + zookeeper.shutdown + } +} + +class TestKafkaSystemAdmin { + import TestKafkaSystemAdmin._ + + @Test + def testShouldGetLastOffsets { + val systemName = "test" + val systemAdmin = new KafkaSystemAdmin(systemName, brokers) + + // Get a non-existent topic. + val initialInputOffsets = systemAdmin.getLastOffsets(Set("foo")) + assertEquals(0, initialInputOffsets.size) + + // Create an empty topic with 50 partitions, but with no offsets. + createTopic + validateTopic + val createdInputOffsets = systemAdmin.getLastOffsets(Set(TOPIC)) + assertEquals(0, createdInputOffsets.size) + + // Add a new message to one of the partitions. + producer.send(new KeyedMessage(TOPIC, "key1", "val1")) + val oneMessageInputOffsets = systemAdmin.getLastOffsets(Set(TOPIC)) + val ssp = oneMessageInputOffsets.keySet.head + assertEquals(1, oneMessageInputOffsets.size) + assertEquals(systemName, ssp.getSystem) + assertEquals(TOPIC, ssp.getStream) + // key1 gets hash-mod'd to partition 48. + assertEquals(48, ssp.getPartition.getPartitionId) + + // Validate that a fetch will return the message. + val connector = getConsumerConnector + var stream = connector.createMessageStreams(Map(TOPIC -> 1)).get(TOPIC).get.get(0).iterator + val message = stream.next + val text = new String(message.message, "UTF-8") + connector.shutdown + // Message's offset should match the expected latest offset. + assertEquals(oneMessageInputOffsets(ssp), message.offset.toString) + assertEquals("val1", text) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index 7d4e20a..9602a52 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.test.integration import org.apache.samza.task.StreamTask @@ -175,6 +194,7 @@ object TestStatefulTask { Utils.rm(server2.config.logDirs) Utils.rm(server3.config.logDirs) zkClient.close + zookeeper.shutdown } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala index 6805052..c9f7029 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala @@ -41,6 +41,7 @@ import TestSamzaAppMasterTaskManager._ import org.apache.samza.system.SystemAdmin import org.apache.samza.system.SystemFactory import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.SamzaException object TestSamzaAppMasterTaskManager { def getContainer(containerId: ContainerId) = new Container { @@ -428,4 +429,6 @@ class MockSystemFactory extends SystemFactory { class MockSinglePartitionManager extends SystemAdmin { def getPartitions(streamName: String) = Set(new Partition(0)) + + def getLastOffsets(streams: java.util.Set[String]) = throw new SamzaException("Need to implement this") }
