Updated Branches:
  refs/heads/master bb0abb670 -> 7bbde2cc2

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
new file mode 100644
index 0000000..8f5fb66
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.system.IncomingMessageEnvelope
+import scala.collection.immutable.Queue
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import java.util.Arrays
+
+@RunWith(value = classOf[Parameterized])
+class TestBootstrappingChooser(getChooser: (MessageChooser, 
Map[SystemStreamPartition, String]) => MessageChooser) {
+  val envelope1 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
+  val envelope2 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2);
+  val envelope3 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 3);
+  val envelope4 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4);
+
+  @Test
+  def testChooserShouldIgnoreStreamsThatArentInOffsetMap {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(mock, Map())
+
+    chooser.register(envelope1.getSystemStreamPartition, "foo")
+    chooser.start
+    assertEquals(1, mock.starts)
+    assertEquals("foo", mock.registers(envelope1.getSystemStreamPartition))
+    chooser.update(envelope1)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(null, chooser.choose)
+    chooser.stop
+    assertEquals(1, mock.stops)
+  }
+
+  @Test
+  def testChooserShouldEliminateCaughtUpStreamsOnRegister {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(mock, Map(envelope1.getSystemStreamPartition -> 
"123"))
+
+    // Even though envelope1's SSP is registered as a bootstrap stream, since 
+    // 123=123, it should be marked as "caught up" and treated like a normal 
+    // stream. This means that non-bootstrap stream envelope should be allowed 
+    // to be chosen.
+    chooser.register(envelope1.getSystemStreamPartition, "123")
+    chooser.register(envelope2.getSystemStreamPartition, "321")
+    chooser.start
+    chooser.update(envelope2)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+
+  @Test
+  def testChooserShouldEliminateCaughtUpStreamsAfterRegister {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(mock, Map(envelope1.getSystemStreamPartition -> 
"123"))
+
+    // Even though envelope1's SSP is registered as a bootstrap stream, since 
+    // 123=123, it should be marked as "caught up" and treated like a normal 
+    // stream. This means that non-bootstrap stream envelope should be allowed 
+    // to be chosen.
+    chooser.register(envelope1.getSystemStreamPartition, "1")
+    chooser.register(envelope2.getSystemStreamPartition, null)
+    chooser.start
+    chooser.update(envelope2)
+    // Choose should not return anything since bootstrapper is blocking 
+    // wrapped.choose until it gets an update from envelope1's SSP.
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope1)
+    // Now that we have an update from the required SSP, the mock chooser 
+    // should be called, and return.
+    assertEquals(envelope2, chooser.choose)
+    // The chooser still has an envelope from envelope1's SSP, so it should 
+    // return.
+    assertEquals(envelope1, chooser.choose)
+    // No envelope for envelope1's SSP has been given, so it should block.
+    chooser.update(envelope2)
+    assertEquals(null, chooser.choose)
+    // Now we're giving an envelope with the proper last offset (123), so no
+    // envelope1's SSP should be treated no differently than envelope2's.
+    chooser.update(envelope4)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(null, chooser.choose)
+    // Should not block here since there are no more lagging bootstrap streams.
+    chooser.update(envelope2)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope2)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+
+  @Test
+  def testChooserShouldWorkWithTwoBootstrapStreams {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(mock, Map(
+      envelope1.getSystemStreamPartition -> "123",
+      envelope2.getSystemStreamPartition -> "321"))
+
+    chooser.register(envelope1.getSystemStreamPartition, "1")
+    chooser.register(envelope2.getSystemStreamPartition, "1")
+    chooser.register(envelope3.getSystemStreamPartition, "1")
+    chooser.start
+    chooser.update(envelope1)
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope3)
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope2)
+
+    // Fully loaded now.
+    assertEquals(envelope1, chooser.choose)
+    // Can't pick again because envelope1's SSP is missing.
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope1)
+    // Can pick again.
+    assertEquals(envelope3, chooser.choose)
+    // Can still pick since envelope3.SSP isn't being tracked.
+    assertEquals(envelope2, chooser.choose)
+    // Can't pick since envelope2.SSP needs an envelope now.
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope2)
+    // Now we get envelope1 again.
+    assertEquals(envelope1, chooser.choose)
+    // Can't pick again.
+    assertEquals(null, chooser.choose)
+    // Now use envelope4, to trigger "all caught up" for envelope1.SSP.
+    chooser.update(envelope4)
+    // Chooser's contents is currently: e2, e4 
(System.err.println(mock.getEnvelopes))
+    // Add envelope3, whose SSP isn't being tracked.
+    chooser.update(envelope3)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope2)
+    // Chooser's contents is currently: e4, e3, e2 
(System.err.println(mock.getEnvelopes))
+    assertEquals(envelope4, chooser.choose)
+    // This should be allowed, even though no message from envelope1.SSP is 
+    // available, since envelope4 triggered "all caught up" because its offset 
+    // matches the offset map for this SSP, and we still have an envelope for 
+    // envelope2.SSP in the queue.
+    assertEquals(envelope3, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+    // Fin.
+  }
+}
+
+object TestBootstrappingChooser {
+  // Test both BatchingChooser and DefaultChooser here. DefaultChooser with 
+  // just batch size defined should behave just like plain vanilla batching 
+  // chooser.
+  @Parameters
+  def parameters: java.util.Collection[Array[(MessageChooser, 
Map[SystemStreamPartition, String]) => MessageChooser]] = Arrays.asList(
+    Array((wrapped: MessageChooser, latestMessageOffsets: 
Map[SystemStreamPartition, String]) => new BootstrappingChooser(wrapped, 
latestMessageOffsets)),
+    Array((wrapped: MessageChooser, latestMessageOffsets: 
Map[SystemStreamPartition, String]) => new DefaultChooser(wrapped, 
bootstrapStreamOffsets = latestMessageOffsets)))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
new file mode 100644
index 0000000..12e12c0
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.util.BlockingEnvelopeMap
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+
+class TestDefaultChooser {
+  val envelope1 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
+  val envelope2 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2);
+  val envelope3 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream2", new Partition(0)), null, null, 3);
+  val envelope4 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4);
+  val envelope5 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream", new Partition(1)), null, null, 5);
+  val envelope6 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream", new Partition(1)), "321", null, 6);
+  val envelope7 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 7);
+
+  @Test
+  def testDefaultChooserWithBatchingPrioritizationAndBootstrapping {
+    val mock0 = new MockMessageChooser
+    val mock1 = new MockMessageChooser
+    val mock2 = new MockMessageChooser
+    val chooser = new DefaultChooser(
+      mock0,
+      Some(2),
+      Map(
+        envelope1.getSystemStreamPartition().getSystemStream -> Int.MaxValue,
+        envelope2.getSystemStreamPartition().getSystemStream -> 1),
+      Map(
+        Int.MaxValue -> mock1,
+        1 -> mock2),
+      Map(
+        envelope1.getSystemStreamPartition() -> "123",
+        envelope5.getSystemStreamPartition() -> "321"))
+
+    chooser.register(envelope1.getSystemStreamPartition, null)
+    chooser.register(envelope2.getSystemStreamPartition, null)
+    chooser.register(envelope3.getSystemStreamPartition, null)
+    chooser.register(envelope5.getSystemStreamPartition, null)
+    chooser.start
+    assertEquals(null, chooser.choose)
+
+    // Load with a non-bootstrap stream, and should still get null.
+    chooser.update(envelope3)
+    assertEquals(null, chooser.choose)
+
+    // Load with a bootstrap stream, should get that envelope.
+    chooser.update(envelope1)
+    assertEquals(envelope1, chooser.choose)
+
+    // Should block envelope3 since we have no message from envelope1's 
bootstrap stream.
+    assertEquals(null, chooser.choose)
+
+    // Load envelope2 from non-bootstrap stream with higher priority than 
envelope3.
+    chooser.update(envelope2)
+
+    // Should block envelope2 since we have no message from envelope1's 
bootstrap stream.
+    assertEquals(null, chooser.choose)
+
+    // Test batching by giving chooser envelope1 and envelope5, both from same 
stream, but envelope1 should be preferred partition.
+    chooser.update(envelope5)
+    chooser.update(envelope1)
+    assertEquals(envelope1, chooser.choose)
+
+    // Now, envelope5 is still loaded, and we've reached our batchSize limit, 
so loading envelope1 should still let envelope5 through.
+    chooser.update(envelope1)
+    assertEquals(envelope5, chooser.choose)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    // Now we're back to just envelope3, envelope2. Let's catch up envelope1's 
SSP using envelope4's offset.
+    chooser.update(envelope4)
+    assertEquals(envelope4, chooser.choose)
+
+    // Should still block envelopes 1 and 2 because the second partition 
hasn't caught up yet.
+    assertEquals(null, chooser.choose)
+
+    // Now catch up the second partition.
+    chooser.update(envelope6)
+    assertEquals(envelope6, chooser.choose)
+
+    // Cool, now no streams are being bootstrapped. Envelope2 should be 
prioritized above envelope3, even though envelope3 was added first.
+    assertEquals(envelope2, chooser.choose)
+
+    // We should still batch, and prefer envelope2's partition over envelope7, 
even though they're both from the same stream.
+    chooser.update(envelope7)
+    chooser.update(envelope2)
+    assertEquals(envelope2, chooser.choose)
+
+    // Now envelope2's partition has passed the batchSize, so we should get 7 
next.
+    chooser.update(envelope2)
+    assertEquals(envelope7, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+
+    // Now we should finally get the lowest priority non-bootstrap stream, 
envelope3.
+    assertEquals(envelope3, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+}
+
+class MockBlockingEnvelopeMap extends BlockingEnvelopeMap {
+  def start = Unit
+  def stop = Unit
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
new file mode 100644
index 0000000..01802b9
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.Partition
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import java.util.Arrays
+
+@RunWith(value = classOf[Parameterized])
+class TestRoundRobinChooser(getChooser: () => MessageChooser) {
+  @Test
+  def testRoundRobinChooser {
+    val chooser = getChooser()
+    val envelope1 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
+    val envelope2 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream", new Partition(1)), null, null, 2);
+    val envelope3 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 3);
+
+    chooser.register(envelope1.getSystemStreamPartition, null)
+    chooser.register(envelope2.getSystemStreamPartition, "")
+    chooser.register(envelope3.getSystemStreamPartition, "123")
+    chooser.start
+
+    assertEquals(null, chooser.choose)
+
+    // Test one message.
+    chooser.update(envelope1)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    // Verify simple ordering.
+    chooser.update(envelope1)
+    chooser.update(envelope2)
+    chooser.update(envelope3)
+
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(envelope3, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    // Verify mixed ordering.
+    chooser.update(envelope2)
+    chooser.update(envelope1)
+
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(envelope1, chooser.choose)
+
+    chooser.update(envelope1)
+    chooser.update(envelope2)
+
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    // Verify simple ordering with different starting envelope.
+    chooser.update(envelope2)
+    chooser.update(envelope1)
+    chooser.update(envelope3)
+
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope3, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+}
+
+object TestRoundRobinChooser {
+  // Test both RoundRobinChooser and DefaultChooser here. DefaultChooser with 
+  // no batching, prioritization, or bootstrapping should default to just a 
+  // plain vanilla round robin chooser.
+  @Parameters
+  def parameters: java.util.Collection[Array[() => MessageChooser]] = 
Arrays.asList(Array(() => new RoundRobinChooser), Array(() => new 
DefaultChooser))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
new file mode 100644
index 0000000..4cde630
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.system.IncomingMessageEnvelope
+import scala.collection.immutable.Queue
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import java.util.Arrays
+import org.apache.samza.system.SystemStream
+
+@RunWith(value = classOf[Parameterized])
+class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, 
MessageChooser], MessageChooser) => MessageChooser) {
+  val envelope1 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
+  val envelope2 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2);
+  val envelope3 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 3);
+  val envelope4 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4);
+
+  @Test
+  def testChooserShouldStartStopAndRegister {
+    val mock0 = new MockMessageChooser
+    val mock1 = new MockMessageChooser
+    val chooser = getChooser(
+      Map(envelope1.getSystemStreamPartition -> 1),
+      Map(1 -> mock1),
+      mock0)
+
+    chooser.register(envelope1.getSystemStreamPartition, "foo")
+    chooser.start
+    assertEquals("foo", mock1.registers(envelope1.getSystemStreamPartition))
+    assertEquals(1, mock0.starts)
+    assertEquals(1, mock1.starts)
+    chooser.stop
+    assertEquals(1, mock0.stops)
+    assertEquals(1, mock1.stops)
+  }
+
+  @Test
+  def testChooserShouldFallBackToDefault {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(
+      Map(),
+      Map(),
+      mock)
+
+    chooser.register(envelope1.getSystemStreamPartition, null)
+    chooser.start
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope1)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+
+  @Test
+  def testChooserShouldFailWithNoDefault {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(
+      Map(envelope1.getSystemStreamPartition.getSystemStream -> 0),
+      Map(0 -> mock),
+      null)
+
+    // The SSP for envelope2 is not defined as a priority stream.
+    chooser.register(envelope2.getSystemStreamPartition, null)
+    chooser.start
+    assertEquals(null, chooser.choose)
+
+    try {
+      chooser.update(envelope2)
+      fail("Should have failed due to missing default chooser.")
+    } catch {
+      case e: SamzaException => // Expected.
+    }
+  }
+
+  @Test
+  def testChooserWithSingleStream {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(
+      Map(envelope1.getSystemStreamPartition.getSystemStream -> 0),
+      Map(0 -> mock),
+      new MockMessageChooser)
+
+    chooser.register(envelope1.getSystemStreamPartition, null)
+    chooser.start
+
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope1)
+    chooser.update(envelope4)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope4)
+    chooser.update(envelope1)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+
+  @Test
+  def testChooserWithSingleStreamWithTwoPartitions {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(
+      Map(envelope2.getSystemStreamPartition.getSystemStream -> 0),
+      Map(0 -> mock),
+      new MockMessageChooser)
+
+    chooser.register(envelope2.getSystemStreamPartition, null)
+    chooser.register(envelope3.getSystemStreamPartition, null)
+    chooser.start
+
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope2)
+    chooser.update(envelope3)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(envelope3, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope3)
+    chooser.update(envelope2)
+    assertEquals(envelope3, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+
+  @Test
+  def testChooserWithTwoStreamsOfEqualPriority {
+    val mock = new MockMessageChooser
+    val chooser = getChooser(
+      Map(
+        envelope1.getSystemStreamPartition.getSystemStream -> 0,
+        envelope2.getSystemStreamPartition.getSystemStream -> 0),
+      Map(0 -> mock),
+      new MockMessageChooser)
+
+    chooser.register(envelope1.getSystemStreamPartition, null)
+    chooser.register(envelope2.getSystemStreamPartition, null)
+    chooser.start
+
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope1)
+    chooser.update(envelope4)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope4)
+    chooser.update(envelope1)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope2)
+    chooser.update(envelope4)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope1)
+    chooser.update(envelope2)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+
+  @Test
+  def testChooserWithTwoStreamsOfDifferentPriority {
+    val mock0 = new MockMessageChooser
+    val mock1 = new MockMessageChooser
+    val chooser = getChooser(
+      Map(
+        envelope1.getSystemStreamPartition.getSystemStream -> 1,
+        envelope2.getSystemStreamPartition.getSystemStream -> 0),
+      Map(
+        0 -> mock0,
+        1 -> mock1),
+      new MockMessageChooser)
+
+    chooser.register(envelope1.getSystemStreamPartition, null)
+    chooser.register(envelope2.getSystemStreamPartition, null)
+    chooser.start
+
+    assertEquals(null, chooser.choose)
+    chooser.update(envelope1)
+    chooser.update(envelope4)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope4)
+    chooser.update(envelope1)
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope2)
+    chooser.update(envelope4)
+    // Reversed here because envelope4.SSP=envelope1.SSP which is higher 
+    // priority.
+    assertEquals(envelope4, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    chooser.update(envelope1)
+    chooser.update(envelope2)
+    assertEquals(envelope1, chooser.choose)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+
+    // Just the low priority stream.
+    chooser.update(envelope2)
+    assertEquals(envelope2, chooser.choose)
+    assertEquals(null, chooser.choose)
+  }
+}
+
+object TestTieredPriorityChooser {
+  // Test both PriorityChooser and DefaultChooser here. DefaultChooser with 
+  // just priorities defined should behave just like plain vanilla priority 
+  // chooser.
+  @Parameters
+  def parameters: java.util.Collection[Array[(Map[SystemStream, Int], Map[Int, 
MessageChooser], MessageChooser) => MessageChooser]] = Arrays.asList(
+    Array((priorities: Map[SystemStream, Int], choosers: Map[Int, 
MessageChooser], default: MessageChooser) => new 
TieredPriorityChooser(priorities, choosers, default)),
+    Array((priorities: Map[SystemStream, Int], choosers: Map[Int, 
MessageChooser], default: MessageChooser) => new DefaultChooser(default, None, 
priorities, choosers)))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 183c6cc..c5487b9 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -25,19 +25,36 @@ import org.apache.samza.util.ClientUtilTopicMetadataStore
 import kafka.api.TopicMetadata
 import scala.collection.JavaConversions._
 import org.apache.samza.system.SystemAdmin
+import org.apache.samza.SamzaException
+import kafka.consumer.SimpleConsumer
+import kafka.utils.Utils
+import kafka.client.ClientUtils
+import java.util.Random
+import kafka.api.TopicMetadataRequest
+import kafka.common.TopicAndPartition
+import kafka.api.PartitionOffsetRequestInfo
+import kafka.api.OffsetRequest
+import kafka.api.FetchRequestBuilder
+import org.apache.samza.system.SystemStreamPartition
+import kafka.common.ErrorMapping
+import grizzled.slf4j.Logging
 
 class KafkaSystemAdmin(
   systemName: String,
   // TODO whenever Kafka decides to make the Set[Broker] class public, let's 
switch to Set[Broker] here.
   brokerListString: String,
-  clientId: String = UUID.randomUUID.toString) extends SystemAdmin {
+  timeout: Int = Int.MaxValue,
+  bufferSize: Int = 1024000,
+  clientId: String = UUID.randomUUID.toString) extends SystemAdmin with 
Logging {
 
-  def getPartitions(streamName: String): java.util.Set[Partition] = {
-    val getTopicMetadata = (topics: Set[String]) => {
-      new ClientUtilTopicMetadataStore(brokerListString, clientId)
-        .getTopicInfo(topics)
-    }
+  val rand = new Random
+
+  private def getTopicMetadata(topics: Set[String]) = {
+    new ClientUtilTopicMetadataStore(brokerListString, clientId)
+      .getTopicInfo(topics)
+  }
 
+  def getPartitions(streamName: String): java.util.Set[Partition] = {
     val metadata = TopicMetadataCache.getTopicMetadata(
       Set(streamName),
       systemName,
@@ -48,4 +65,93 @@ class KafkaSystemAdmin(
       .map(pm => new Partition(pm.partitionId))
       .toSet[Partition]
   }
+
+  def getLastOffsets(streams: java.util.Set[String]) = {
+    var offsets = Map[SystemStreamPartition, String]()
+    var done = false
+    var consumer: SimpleConsumer = null
+
+    debug("Fetching offsets for: %s" format streams)
+
+    while (!done) {
+      try {
+        val metadata = TopicMetadataCache.getTopicMetadata(
+          streams.toSet,
+          systemName,
+          getTopicMetadata)
+
+        debug("Got metadata for streams: %s" format metadata)
+
+        // Break topic metadata topic/partitions into per-broker map.
+        val brokersToTopicPartitions = metadata
+          .values
+          // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)] 
+          .flatMap(topicMetadata => topicMetadata
+            .partitionsMetadata
+            // Convert Seq[PartitionMetadata] to Seq[(Broker, 
TopicAndPartition)]
+            .map(partitionMetadata => {
+              ErrorMapping.maybeThrowException(partitionMetadata.errorCode)
+              val topicAndPartition = new 
TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId)
+              val leader = partitionMetadata
+                .leader
+                .getOrElse(throw new SamzaException("Need leaders for all 
partitions when fetching offsets. No leader available for TopicAndPartition: 
%s" format topicAndPartition))
+              (leader, topicAndPartition)
+            }))
+          // Convert to a Map[Broker, Seq[(Broker, TopicAndPartition)]]
+          .groupBy(_._1)
+          // Convert to a Map[Broker, Seq[TopicAndPartition]]
+          .mapValues(_.map(_._2))
+
+        debug("Got topic partition data for brokers: %s" format 
brokersToTopicPartitions)
+
+        // Get the latest offsets for each topic and partition.
+        for ((broker, topicsAndPartitions) <- brokersToTopicPartitions) {
+          val partitionOffsetInfo = topicsAndPartitions
+            .map(topicAndPartition => (topicAndPartition, 
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+            .toMap
+          consumer = new SimpleConsumer(broker.host, broker.port, timeout, 
bufferSize, clientId)
+          val brokerOffsets = consumer
+            .getOffsetsBefore(new OffsetRequest(partitionOffsetInfo))
+            .partitionErrorAndOffsets
+            .map(partitionAndOffset => {
+              if (partitionAndOffset._2.offsets.head <= 0) {
+                debug("Filtering out empty topic partition: %s" format 
partitionAndOffset)
+              }
+
+              partitionAndOffset
+            })
+            .filter(_._2.offsets.head > 0)
+            // Kafka returns 1 greater than the offset of the last message in 
+            // the topic, so subtract one to fetch the last message.
+            .mapValues(_.offsets.head - 1)
+
+          debug("Got offsets: %s" format brokerOffsets)
+          debug("Shutting down consumer for %s:%s." format (broker.host, 
broker.port))
+
+          consumer.close
+
+          for ((topicAndPartition, offset) <- brokerOffsets) {
+            offsets += new SystemStreamPartition(systemName, 
topicAndPartition.topic, new Partition(topicAndPartition.partition)) -> 
offset.toString
+          }
+        }
+
+        done = true
+      } catch {
+        case e: InterruptedException =>
+          info("Interrupted while fetching last offsets, so forwarding.")
+          if (consumer != null) {
+            consumer.close
+          }
+          throw e
+        case e: Exception =>
+          // Retry.
+          warn("Unable to fetch last offsets for streams due to: %s, %s. 
Retrying. Turn on debugging to get a full stack trace." format (e.getMessage, 
streams))
+          debug(e)
+      }
+    }
+
+    info("Got latest offsets for streams: %s, %s" format (streams, offsets))
+
+    offsets
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index ba08af8..a11a72a 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -103,7 +103,15 @@ class KafkaSystemFactory extends SystemFactory {
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, 
clientId)
     val brokerListString = Option(producerConfig.brokerList)
       .getOrElse(throw new SamzaException("No broker list defined in config 
for %s." format systemName))
+    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, 
clientId)
+    val timeout = consumerConfig.socketTimeoutMs
+    val bufferSize = consumerConfig.socketReceiveBufferBytes
 
-    new KafkaSystemAdmin(systemName, brokerListString, clientId)
+    new KafkaSystemAdmin(
+      systemName,
+      brokerListString,
+      timeout,
+      bufferSize,
+      clientId)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
index baf4695..cad2231 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
@@ -105,6 +105,7 @@ object TestKafkaCheckpointManager {
     Utils.rm(server1.config.logDirs)
     Utils.rm(server2.config.logDirs)
     Utils.rm(server3.config.logDirs)
+    zookeeper.shutdown
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
new file mode 100644
index 0000000..f0c6f8a
--- /dev/null
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka
+
+import org.junit.Assert._
+import org.junit.Test
+import kafka.zk.EmbeddedZookeeper
+import org.apache.samza.checkpoint.Checkpoint
+import org.junit.BeforeClass
+import org.junit.AfterClass
+import org.apache.samza.util.ClientUtilTopicMetadataStore
+import org.I0Itec.zkclient.ZkClient
+import kafka.admin.AdminUtils
+import org.apache.samza.util.TopicMetadataStore
+import kafka.producer.ProducerConfig
+import kafka.utils.TestUtils
+import kafka.common.ErrorMapping
+import kafka.utils.TestZKUtils
+import kafka.server.KafkaServer
+import kafka.producer.Producer
+import kafka.server.KafkaConfig
+import kafka.utils.Utils
+import org.apache.samza.system.SystemStream
+import kafka.utils.ZKStringSerializer
+import scala.collection.JavaConversions._
+import kafka.producer.KeyedMessage
+import kafka.message.MessageAndMetadata
+import scala.collection.mutable.ArrayBuffer
+import kafka.consumer.Consumer
+import kafka.consumer.ConsumerConfig
+import java.util.Properties
+import com.sun.xml.internal.xsom.impl.parser.state.group
+
+object TestKafkaSystemAdmin {
+  val TOPIC = "input"
+  val TOTAL_PARTITIONS = 50
+  val REPLICATION_FACTOR = 2
+
+  val zkConnect: String = TestZKUtils.zookeeperConnect
+  var zkClient: ZkClient = null
+  val zkConnectionTimeout = 6000
+  val zkSessionTimeout = 6000
+  val brokerId1 = 0
+  val brokerId2 = 1
+  val brokerId3 = 2
+  val ports = TestUtils.choosePorts(3)
+  val (port1, port2, port3) = (ports(0), ports(1), ports(2))
+
+  val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+  val config1 = new KafkaConfig(props1) {
+    override val hostName = "localhost"
+    override val numPartitions = 1
+    override val zkConnect = TestKafkaSystemAdmin.zkConnect + "/"
+  }
+  val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  val config2 = new KafkaConfig(props2) {
+    override val hostName = "localhost"
+    override val numPartitions = 1
+    override val zkConnect = TestKafkaSystemAdmin.zkConnect + "/"
+  }
+  val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
+  val config3 = new KafkaConfig(props3) {
+    override val hostName = "localhost"
+    override val numPartitions = 1
+    override val zkConnect = TestKafkaSystemAdmin.zkConnect + "/"
+  }
+
+  val config = new java.util.Properties()
+  val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, 
port3)
+  config.put("metadata.broker.list", brokers)
+  config.put("producer.type", "sync")
+  config.put("request.required.acks", "-1")
+  config.put("serializer.class", "kafka.serializer.StringEncoder");
+  val producerConfig = new ProducerConfig(config)
+  var producer: Producer[String, String] = null
+  var zookeeper: EmbeddedZookeeper = null
+  var server1: KafkaServer = null
+  var server2: KafkaServer = null
+  var server3: KafkaServer = null
+  var metadataStore: TopicMetadataStore = null
+
+  @BeforeClass
+  def beforeSetupServers {
+    zookeeper = new EmbeddedZookeeper(zkConnect)
+    server1 = TestUtils.createServer(config1)
+    server2 = TestUtils.createServer(config2)
+    server3 = TestUtils.createServer(config3)
+    zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer)
+    producer = new Producer(producerConfig)
+    metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
+  }
+
+  def createTopic {
+    AdminUtils.createTopic(
+      zkClient,
+      TOPIC,
+      TOTAL_PARTITIONS,
+      REPLICATION_FACTOR)
+  }
+
+  def validateTopic {
+    var done = false
+    var retries = 0
+    val maxRetries = 100
+
+    while (!done && retries < maxRetries) {
+      try {
+        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(TOPIC), 
"kafka", metadataStore.getTopicInfo)
+        val topicMetadata = topicMetadataMap(TOPIC)
+        val errorCode = topicMetadata.errorCode
+
+        ErrorMapping.maybeThrowException(errorCode)
+
+        done = true
+      } catch {
+        case e: Throwable =>
+          System.err.println("Got exception while validating test topics. 
Waiting and retrying.", e)
+          retries += 1
+          Thread.sleep(500)
+      }
+    }
+
+    if (retries >= maxRetries) {
+      fail("Unable to successfully create topics. Tried to validate %s times." 
format retries)
+    }
+  }
+
+  def getConsumerConnector = {
+    val props = new Properties
+
+    props.put("zookeeper.connect", zkConnect)
+    props.put("group.id", "test")
+    props.put("auto.offset.reset", "smallest")
+
+    val consumerConfig = new ConsumerConfig(props)
+    Consumer.create(consumerConfig)
+  }
+
+  @AfterClass
+  def afterCleanLogDirs {
+    server1.shutdown
+    server1.awaitShutdown()
+    server2.shutdown
+    server2.awaitShutdown()
+    server3.shutdown
+    server3.awaitShutdown()
+    Utils.rm(server1.config.logDirs)
+    Utils.rm(server2.config.logDirs)
+    Utils.rm(server3.config.logDirs)
+    zkClient.close
+    zookeeper.shutdown
+  }
+}
+
+class TestKafkaSystemAdmin {
+  import TestKafkaSystemAdmin._
+
+  @Test
+  def testShouldGetLastOffsets {
+    val systemName = "test"
+    val systemAdmin = new KafkaSystemAdmin(systemName, brokers)
+
+    // Get a non-existent topic.
+    val initialInputOffsets = systemAdmin.getLastOffsets(Set("foo"))
+    assertEquals(0, initialInputOffsets.size)
+
+    // Create an empty topic with 50 partitions, but with no offsets.
+    createTopic
+    validateTopic
+    val createdInputOffsets = systemAdmin.getLastOffsets(Set(TOPIC))
+    assertEquals(0, createdInputOffsets.size)
+
+    // Add a new message to one of the partitions.
+    producer.send(new KeyedMessage(TOPIC, "key1", "val1"))
+    val oneMessageInputOffsets = systemAdmin.getLastOffsets(Set(TOPIC))
+    val ssp = oneMessageInputOffsets.keySet.head
+    assertEquals(1, oneMessageInputOffsets.size)
+    assertEquals(systemName, ssp.getSystem)
+    assertEquals(TOPIC, ssp.getStream)
+    // key1 gets hash-mod'd to partition 48.
+    assertEquals(48, ssp.getPartition.getPartitionId)
+
+    // Validate that a fetch will return the message.
+    val connector = getConsumerConnector
+    var stream = connector.createMessageStreams(Map(TOPIC -> 
1)).get(TOPIC).get.get(0).iterator
+    val message = stream.next
+    val text = new String(message.message, "UTF-8")
+    connector.shutdown
+    // Message's offset should match the expected latest offset.
+    assertEquals(oneMessageInputOffsets(ssp), message.offset.toString)
+    assertEquals("val1", text)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 7d4e20a..9602a52 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.test.integration
 
 import org.apache.samza.task.StreamTask
@@ -175,6 +194,7 @@ object TestStatefulTask {
     Utils.rm(server2.config.logDirs)
     Utils.rm(server3.config.logDirs)
     zkClient.close
+    zookeeper.shutdown
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7bbde2cc/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index 6805052..c9f7029 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -41,6 +41,7 @@ import TestSamzaAppMasterTaskManager._
 import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.SamzaException
 
 object TestSamzaAppMasterTaskManager {
   def getContainer(containerId: ContainerId) = new Container {
@@ -428,4 +429,6 @@ class MockSystemFactory extends SystemFactory {
 
 class MockSinglePartitionManager extends SystemAdmin {
   def getPartitions(streamName: String) = Set(new Partition(0))
+  
+  def getLastOffsets(streams: java.util.Set[String]) = throw new 
SamzaException("Need to implement this")
 }

Reply via email to