Repository: incubator-samza Updated Branches: refs/heads/master 5ae028595 -> 55929095f
SAMZA-166: TestStatefulTask fails. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/55929095 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/55929095 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/55929095 Branch: refs/heads/master Commit: 55929095f5abc0911278ca8336bccbc835122c84 Parents: 5ae0285 Author: Chris Riccomini <[email protected]> Authored: Wed Mar 5 12:40:16 2014 -0800 Committer: Jakob Homan <[email protected]> Committed: Wed Mar 5 12:40:16 2014 -0800 ---------------------------------------------------------------------- .../test/integration/TestStatefulTask.scala | 55 +++++++++++++++----- 1 file changed, 43 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/55929095/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index d36de26..493c984 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -185,7 +185,7 @@ object TestStatefulTask { /** * Test that does the following: - * + * * 1. Starts ZK, and 3 kafka brokers. * 2. Create two topics: input and mystore. * 3. Validate that the topics were created successfully and have leaders. @@ -211,12 +211,24 @@ class TestStatefulTask { "stores.mystore.factory" -> "org.apache.samza.storage.kv.KeyValueStorageEngineFactory", "stores.mystore.key.serde" -> "string", "stores.mystore.msg.serde" -> "string", - "stores.mystore.changelog" -> "kafka.mystore", + "stores.mystore.changelog" -> "kafka-state.mystore", + + // TODO we don't need two different systems once SAMZA-157 is committed. + // We will be able to do per-stream offset defaults. + + // Use smallest reset for input streams, so we can fix SAMZA-166. "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory", "systems.kafka.consumer.zookeeper.connect" -> zkConnect, - "systems.kafka.consumer.auto.offset.reset" -> "largest", + "systems.kafka.consumer.auto.offset.reset" -> "smallest", "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1), - "systems.kafka.samza.msg.serde" -> "string") + "systems.kafka.samza.msg.serde" -> "string", + + // Use largest offset for changelog stream, so we can test SAMZA-142. + "systems.kafka-state.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory", + "systems.kafka-state.consumer.zookeeper.connect" -> zkConnect, + "systems.kafka-state.consumer.auto.offset.reset" -> "smallest", + "systems.kafka-state.producer.metadata.broker.list" -> ("localhost:%s" format port1), + "systems.kafka-state.samza.msg.serde" -> "string") @Test def testShouldStartAndRestore { @@ -260,30 +272,49 @@ class TestStatefulTask { assertTrue(task.restored.contains("2")) assertTrue(task.restored.contains("3")) + var count = 0 + + // We should get the original four messages in the stream (1,2,3,2). + // Note that this will trigger four new outgoing messages to the STATE_TOPIC. + while (task.received.size < 4 && count < 100) { + Thread.sleep(600) + count += 1 + } + + assertTrue(count < 100) + + // Reset the count down latch after the 4 messages come in. + task.awaitMessage + // Send some messages to input stream. send(task, "4") send(task, "5") send(task, "5") // Validate that messages appear in store stream. - val messages = readAll(STATE_TOPIC, 6, "testShouldRestoreStore") + val messages = readAll(STATE_TOPIC, 10, "testShouldRestoreStore") - assertEquals(7, messages.length) + assertEquals(11, messages.length) // From initial start. assertEquals("1", messages(0)) assertEquals("2", messages(1)) assertEquals("3", messages(2)) assertEquals("2", messages(3)) + // From second startup. + assertEquals("1", messages(4)) + assertEquals("2", messages(5)) + assertEquals("3", messages(6)) + assertEquals("2", messages(7)) // From sending in this method. - assertEquals("4", messages(4)) - assertEquals("5", messages(5)) - assertEquals("5", messages(6)) + assertEquals("4", messages(8)) + assertEquals("5", messages(9)) + assertEquals("5", messages(10)) stopJob(job) } /** - * Start a job for TestJob, and do some basic sanity checks around startup + * Start a job for TestJob, and do some basic sanity checks around startup * time, number of partitions, etc. */ def startJob = { @@ -308,7 +339,7 @@ class TestStatefulTask { } /** - * Kill a job, and wait for an unsuccessful finish (since this throws an + * Kill a job, and wait for an unsuccessful finish (since this throws an * interrupt, which is forwarded on to ThreadJob, and marked as a failure). */ def stopJob(job: StreamJob) { @@ -327,7 +358,7 @@ class TestStatefulTask { } /** - * Read all messages from a topic starting from last saved offset for group. + * Read all messages from a topic starting from last saved offset for group. * To read all from offset 0, specify a unique, new group string. */ def readAll(topic: String, maxOffsetInclusive: Int, group: String): List[String] = {
