Repository: incubator-samza
Updated Branches:
  refs/heads/master 5ae028595 -> 55929095f


SAMZA-166: TestStatefulTask fails.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/55929095
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/55929095
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/55929095

Branch: refs/heads/master
Commit: 55929095f5abc0911278ca8336bccbc835122c84
Parents: 5ae0285
Author: Chris Riccomini <[email protected]>
Authored: Wed Mar 5 12:40:16 2014 -0800
Committer: Jakob Homan <[email protected]>
Committed: Wed Mar 5 12:40:16 2014 -0800

----------------------------------------------------------------------
 .../test/integration/TestStatefulTask.scala     | 55 +++++++++++++++-----
 1 file changed, 43 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/55929095/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index d36de26..493c984 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -185,7 +185,7 @@ object TestStatefulTask {
 
 /**
  * Test that does the following:
- * 
+ *
  * 1. Starts ZK, and 3 kafka brokers.
  * 2. Create two topics: input and mystore.
  * 3. Validate that the topics were created successfully and have leaders.
@@ -211,12 +211,24 @@ class TestStatefulTask {
     "stores.mystore.factory" -> 
"org.apache.samza.storage.kv.KeyValueStorageEngineFactory",
     "stores.mystore.key.serde" -> "string",
     "stores.mystore.msg.serde" -> "string",
-    "stores.mystore.changelog" -> "kafka.mystore",
+    "stores.mystore.changelog" -> "kafka-state.mystore",
+
+    // TODO we don't need two different systems once SAMZA-157 is committed. 
+    // We will be able to do per-stream offset defaults.
+
+    // Use smallest reset for input streams, so we can fix SAMZA-166.
     "systems.kafka.samza.factory" -> 
"org.apache.samza.system.kafka.KafkaSystemFactory",
     "systems.kafka.consumer.zookeeper.connect" -> zkConnect,
-    "systems.kafka.consumer.auto.offset.reset" -> "largest",
+    "systems.kafka.consumer.auto.offset.reset" -> "smallest",
     "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format 
port1),
-    "systems.kafka.samza.msg.serde" -> "string")
+    "systems.kafka.samza.msg.serde" -> "string",
+
+    // Use largest offset for changelog stream, so we can test SAMZA-142.
+    "systems.kafka-state.samza.factory" -> 
"org.apache.samza.system.kafka.KafkaSystemFactory",
+    "systems.kafka-state.consumer.zookeeper.connect" -> zkConnect,
+    "systems.kafka-state.consumer.auto.offset.reset" -> "smallest",
+    "systems.kafka-state.producer.metadata.broker.list" -> ("localhost:%s" 
format port1),
+    "systems.kafka-state.samza.msg.serde" -> "string")
 
   @Test
   def testShouldStartAndRestore {
@@ -260,30 +272,49 @@ class TestStatefulTask {
     assertTrue(task.restored.contains("2"))
     assertTrue(task.restored.contains("3"))
 
+    var count = 0
+
+    // We should get the original four messages in the stream (1,2,3,2).
+    // Note that this will trigger four new outgoing messages to the 
STATE_TOPIC.
+    while (task.received.size < 4 && count < 100) {
+      Thread.sleep(600)
+      count += 1
+    }
+
+    assertTrue(count < 100)
+
+    // Reset the count down latch after the 4 messages come in.
+    task.awaitMessage
+
     // Send some messages to input stream.
     send(task, "4")
     send(task, "5")
     send(task, "5")
 
     // Validate that messages appear in store stream.
-    val messages = readAll(STATE_TOPIC, 6, "testShouldRestoreStore")
+    val messages = readAll(STATE_TOPIC, 10, "testShouldRestoreStore")
 
-    assertEquals(7, messages.length)
+    assertEquals(11, messages.length)
     // From initial start.
     assertEquals("1", messages(0))
     assertEquals("2", messages(1))
     assertEquals("3", messages(2))
     assertEquals("2", messages(3))
+    // From second startup.
+    assertEquals("1", messages(4))
+    assertEquals("2", messages(5))
+    assertEquals("3", messages(6))
+    assertEquals("2", messages(7))
     // From sending in this method.
-    assertEquals("4", messages(4))
-    assertEquals("5", messages(5))
-    assertEquals("5", messages(6))
+    assertEquals("4", messages(8))
+    assertEquals("5", messages(9))
+    assertEquals("5", messages(10))
 
     stopJob(job)
   }
 
   /**
-   * Start a job for TestJob, and do some basic sanity checks around startup 
+   * Start a job for TestJob, and do some basic sanity checks around startup
    * time, number of partitions, etc.
    */
   def startJob = {
@@ -308,7 +339,7 @@ class TestStatefulTask {
   }
 
   /**
-   * Kill a job, and wait for an unsuccessful finish (since this throws an 
+   * Kill a job, and wait for an unsuccessful finish (since this throws an
    * interrupt, which is forwarded on to ThreadJob, and marked as a failure).
    */
   def stopJob(job: StreamJob) {
@@ -327,7 +358,7 @@ class TestStatefulTask {
   }
 
   /**
-   * Read all messages from a topic starting from last saved offset for group. 
+   * Read all messages from a topic starting from last saved offset for group.
    * To read all from offset 0, specify a unique, new group string.
    */
   def readAll(topic: String, maxOffsetInclusive: Int, group: String): 
List[String] = {

Reply via email to