http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/test/scala/old/checkpoint/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/old/checkpoint/TestKafkaCheckpointManager.scala 
b/samza-kafka/src/test/scala/old/checkpoint/TestKafkaCheckpointManager.scala
deleted file mode 100644
index 2c0304f..0000000
--- a/samza-kafka/src/test/scala/old/checkpoint/TestKafkaCheckpointManager.scala
+++ /dev/null
@@ -1,430 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package old.checkpoint
-
-import java.util.Properties
-import kafka.admin.AdminUtils
-import kafka.common.{InvalidMessageSizeException, 
UnknownTopicOrPartitionException}
-import kafka.message.InvalidMessageException
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer}
-import kafka.zk.EmbeddedZookeeper
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.producer.{Producer, ProducerRecord, 
KafkaProducer, ProducerConfig}
-import org.apache.samza.checkpoint.{CheckpointManager, Checkpoint}
-import org.apache.samza.config.SystemConfig._
-import org.apache.samza.config.{JobConfig, KafkaProducerConfig, MapConfig}
-import org.apache.samza.container.TaskName
-import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
-import org.apache.samza.coordinator.MockSystemFactory
-import org.apache.samza.coordinator.stream.messages.SetMigrationMetaMessage
-import 
org.apache.samza.coordinator.stream.{MockCoordinatorStreamSystemFactory, 
CoordinatorStreamSystemFactory}
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{ClientUtilTopicMetadataStore, TopicMetadataStore}
-import org.apache.samza.{Partition, SamzaException}
-import org.junit.Assert._
-import org.junit._
-import scala.collection.JavaConversions._
-import scala.collection._
-
-class TestKafkaCheckpointManager {
-
-  val checkpointTopic = "checkpoint-topic"
-  val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
-  val checkpointTopicConfig = 
KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null)
-  val zkConnect: String = TestZKUtils.zookeeperConnect
-  var zkClient: ZkClient = null
-  val zkConnectionTimeout = 6000
-  val zkSessionTimeout = 6000
-
-  val brokerId1 = 0
-  val brokerId2 = 1
-  val brokerId3 = 2
-  val ports = TestUtils.choosePorts(3)
-  val (port1, port2, port3) = (ports(0), ports(1), ports(2))
-
-  val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  props1.put("controlled.shutdown.enable", "true")
-  val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
-  props1.put("controlled.shutdown.enable", "true")
-  val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
-  props1.put("controlled.shutdown.enable", "true")
-
-  val config = new java.util.HashMap[String, Object]()
-  val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, 
port3)
-  config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
-  config.put("acks", "all")
-  config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
-  config.put(ProducerConfig.RETRIES_CONFIG, (new 
Integer(java.lang.Integer.MAX_VALUE-1)).toString)
-  config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
-  val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
-  val partition = new Partition(0)
-  val partition2 = new Partition(1)
-  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", 
partition) -> "123"))
-  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", 
partition) -> "12345"))
-  var zookeeper: EmbeddedZookeeper = null
-  var server1: KafkaServer = null
-  var server2: KafkaServer = null
-  var server3: KafkaServer = null
-  var metadataStore: TopicMetadataStore = null
-
-  val systemStreamPartitionGrouperFactoryString = 
classOf[GroupByPartitionFactory].getCanonicalName
-
-  @Before
-  def beforeSetupServers {
-    zookeeper = new EmbeddedZookeeper(zkConnect)
-    server1 = TestUtils.createServer(new KafkaConfig(props1))
-    server2 = TestUtils.createServer(new KafkaConfig(props2))
-    server3 = TestUtils.createServer(new KafkaConfig(props3))
-    metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
-  }
-
-  @After
-  def afterCleanLogDirs {
-    server1.shutdown
-    server1.awaitShutdown()
-    server2.shutdown
-    server2.awaitShutdown()
-    server3.shutdown
-    server3.awaitShutdown()
-    Utils.rm(server1.config.logDirs)
-    Utils.rm(server2.config.logDirs)
-    Utils.rm(server3.config.logDirs)
-    zookeeper.shutdown
-  }
-
-  private def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint, 
cpTopic: String = checkpointTopic) = {
-    val producer: Producer[Array[Byte], Array[Byte]] = new 
KafkaProducer(producerConfig.getProducerProperties)
-    val record = new ProducerRecord(
-      cpTopic,
-      0,
-      KafkaCheckpointLogKey.getCheckpointKey(taskName).toBytes(),
-      new CheckpointSerde().toBytes(checkpoint)
-    )
-    try {
-      producer.send(record).get()
-    } catch {
-      case e: Exception => println(e.getMessage)
-    } finally {
-      producer.close()
-    }
-  }
-
-  private def writeChangeLogPartitionMapping(changelogMapping: Map[TaskName, 
Integer], cpTopic: String = checkpointTopic) = {
-    val producer: Producer[Array[Byte], Array[Byte]] = new 
KafkaProducer(producerConfig.getProducerProperties)
-    val record = new ProducerRecord(
-      cpTopic,
-      0,
-      KafkaCheckpointLogKey.getChangelogPartitionMappingKey().toBytes(),
-      new CheckpointSerde().changelogPartitionMappingToBytes(changelogMapping)
-    )
-    try {
-      producer.send(record).get()
-    } catch {
-      case e: Exception => println(e.getMessage)
-    } finally {
-      producer.close()
-    }
-  }
-
-  private def createCheckpointTopic(cpTopic: String = checkpointTopic) = {
-    val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
-    try {
-      AdminUtils.createTopic(
-        zkClient,
-        checkpointTopic,
-        1,
-        1,
-        checkpointTopicConfig)
-    } catch {
-      case e: Exception => println(e.getMessage)
-    } finally {
-      zkClient.close
-    }
-  }
-
-  @Test
-  def testStartFailureWithNoCheckpointTopic() {
-    try {
-      val map = new java.util.HashMap[String, String]()
-      val mapConfig = Map(
-        "task.checkpoint.factory" -> 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
-        JobConfig.JOB_NAME -> "test",
-        JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
-        JobConfig.JOB_CONTAINER_COUNT -> "2",
-        "task.inputs" -> "test.stream1",
-        "task.checkpoint.system" -> "test",
-        SYSTEM_FACTORY.format("test") -> 
classOf[MockSystemFactory].getCanonicalName,
-        SYSTEM_FACTORY.format("coordinator") -> 
classOf[MockCoordinatorStreamSystemFactory].getName)
-      // Enable consumer caching
-      MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
-
-      val config = new MapConfig(mapConfig)
-      val migrate = new KafkaCheckpointMigration
-      val oldCheckpointManager = getKafkaCheckpointManager
-      oldCheckpointManager.start
-      fail("KafkaCheckpointManager start should have failed")
-    } catch {
-      case se: SamzaException => assertEquals(se.getMessage, "Failed to start 
KafkaCheckpointManager for non-existing checkpoint topic. " +
-        "KafkaCheckpointManager should only be used for migration purpose.")
-    }
-  }
-
-  @Test
-  def testMigrationWithNoCheckpointTopic() {
-    try {
-      val map = new java.util.HashMap[String, String]()
-      val mapConfig = Map(
-        "task.checkpoint.factory" -> 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
-        JobConfig.JOB_NAME -> "test",
-        JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
-        JobConfig.JOB_CONTAINER_COUNT -> "2",
-        "task.inputs" -> "test.stream1",
-        "task.checkpoint.system" -> "test",
-        SYSTEM_FACTORY.format("test") -> 
classOf[MockSystemFactory].getCanonicalName,
-        SYSTEM_FACTORY.format("coordinator") -> 
classOf[MockCoordinatorStreamSystemFactory].getName)
-      // Enable consumer caching
-      MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
-
-      val config = new MapConfig(mapConfig)
-      val migrate = new KafkaCheckpointMigration
-      val oldCheckpointManager = getKafkaCheckpointManager
-
-      // Write a couple of checkpoints in the old checkpoint topic
-      val task1 = new TaskName(partition.toString)
-      val task2 = new TaskName(partition2.toString)
-
-      val changelogMapping = Map()
-
-      // Initialize coordinator stream
-      val coordinatorFactory = new CoordinatorStreamSystemFactory()
-      val coordinatorSystemConsumer = 
coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new 
MetricsRegistryMap)
-      val coordinatorSystemProducer = 
coordinatorFactory.getCoordinatorStreamSystemProducer(config, new 
MetricsRegistryMap)
-      coordinatorSystemConsumer.register()
-      coordinatorSystemConsumer.start()
-
-      
assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size,
 0)
-      coordinatorSystemConsumer.stop
-
-      // Start the migration
-      def getManager() = getKafkaCheckpointManager
-      migrate.migrate(config, getManager)
-      // Ensure migration step does not create the non-existing checkpoint 
topic
-      assertFalse(getManager.topicExists)
-
-      // Verify if the checkpoints have been migrated
-      val newCheckpointManager = new 
CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "test")
-      newCheckpointManager.register(task1)
-      newCheckpointManager.register(task2)
-      newCheckpointManager.start()
-      assertNull(newCheckpointManager.readLastCheckpoint(task1))
-      assertNull(newCheckpointManager.readLastCheckpoint(task2))
-      newCheckpointManager.stop()
-
-      // Verify if the changelogPartitionInfo has been migrated
-      val newChangelogManager = new 
ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, 
"test")
-      newChangelogManager.start
-      val newChangelogMapping = 
newChangelogManager.readChangeLogPartitionMapping()
-      newChangelogManager.stop
-      assertEquals(newChangelogMapping.toMap, changelogMapping)
-
-      // Check for migration message
-      coordinatorSystemConsumer.register()
-      coordinatorSystemConsumer.start()
-      
assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size,
 1)
-      coordinatorSystemConsumer.stop()
-    }
-    finally {
-      MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
-    }
-  }
-
-  @Test
-  def testMigration() {
-    try {
-      val map = new java.util.HashMap[String, String]()
-      val mapConfig = Map(
-        "task.checkpoint.factory" -> 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
-        JobConfig.JOB_NAME -> "test",
-        JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
-        JobConfig.JOB_CONTAINER_COUNT -> "2",
-        "task.inputs" -> "test.stream1",
-        "task.checkpoint.system" -> "test",
-        SYSTEM_FACTORY.format("test") -> 
classOf[MockSystemFactory].getCanonicalName,
-        SYSTEM_FACTORY.format("coordinator") -> 
classOf[MockCoordinatorStreamSystemFactory].getName)
-      // Enable consumer caching
-      MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
-
-      val config = new MapConfig(mapConfig)
-      val migrate = new KafkaCheckpointMigration
-      val oldCheckpointManager = getKafkaCheckpointManager
-
-      createCheckpointTopic()
-      oldCheckpointManager.validateTopic
-
-      // Write a couple of checkpoints in the old checkpoint topic
-      val task1 = new TaskName(partition.toString)
-      val task2 = new TaskName(partition2.toString)
-      writeCheckpoint(task1, cp1)
-      writeCheckpoint(task2, cp2)
-
-      val changelogMapping = Map(task1 -> 1.asInstanceOf[Integer], task2 -> 
10.asInstanceOf[Integer])
-      // Write changelog partition info to the old checkpoint topic
-      writeChangeLogPartitionMapping(changelogMapping)
-      oldCheckpointManager.stop
-
-      // Initialize coordinator stream
-      val coordinatorFactory = new CoordinatorStreamSystemFactory()
-      val coordinatorSystemConsumer = 
coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new 
MetricsRegistryMap)
-      val coordinatorSystemProducer = 
coordinatorFactory.getCoordinatorStreamSystemProducer(config, new 
MetricsRegistryMap)
-      coordinatorSystemConsumer.register()
-      coordinatorSystemConsumer.start()
-
-      
assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size,
 0)
-      coordinatorSystemConsumer.stop
-
-      // Start the migration
-      def getManager() = getKafkaCheckpointManager
-      migrate.migrate(config, getManager)
-
-      // Verify if the checkpoints have been migrated
-      val newCheckpointManager = new 
CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "test")
-      newCheckpointManager.register(task1)
-      newCheckpointManager.register(task2)
-      newCheckpointManager.start()
-      assertEquals(cp1, newCheckpointManager.readLastCheckpoint(task1))
-      assertEquals(cp2, newCheckpointManager.readLastCheckpoint(task2))
-      newCheckpointManager.stop()
-
-      // Verify if the changelogPartitionInfo has been migrated
-      val newChangelogManager = new 
ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, 
"test")
-      newChangelogManager.start
-      val newChangelogMapping = 
newChangelogManager.readChangeLogPartitionMapping()
-      newChangelogManager.stop
-      assertEquals(newChangelogMapping.toMap, changelogMapping)
-
-      // Check for migration message
-      coordinatorSystemConsumer.register()
-      coordinatorSystemConsumer.start()
-      
assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size,
 1)
-      coordinatorSystemConsumer.stop()
-    }
-    finally {
-      MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
-    }
-  }
-
-  @Test
-  def 
testCheckpointShouldBeNullIfCheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite
 {
-    val kcm = getKafkaCheckpointManager
-    val taskName = new TaskName(partition.toString)
-    kcm.register(taskName)
-    createCheckpointTopic()
-    kcm.validateTopic
-    // check that log compaction is enabled.
-    val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
-    val topicConfig = AdminUtils.fetchTopicConfig(zkClient, checkpointTopic)
-    zkClient.close
-    assertEquals("compact", topicConfig.get("cleanup.policy"))
-    assertEquals("26214400", topicConfig.get("segment.bytes"))
-    // read before topic exists should result in a null checkpoint
-    var readCp = kcm.readLastCheckpoint(taskName)
-    assertNull(readCp)
-    // create topic the first time around
-    writeCheckpoint(taskName, cp1)
-    readCp = kcm.readLastCheckpoint(taskName)
-    assertEquals(cp1, readCp)
-    // should get an exception if partition doesn't exist
-    try {
-      readCp = kcm.readLastCheckpoint(new TaskName(new Partition(1).toString))
-      fail("Expected a SamzaException, since only one partition (partition 0) 
should exist.")
-    } catch {
-      case e: SamzaException => None // expected
-      case _: Exception => fail("Expected a SamzaException, since only one 
partition (partition 0) should exist.")
-    }
-    // writing a second message should work, too
-    writeCheckpoint(taskName, cp2)
-    readCp = kcm.readLastCheckpoint(taskName)
-    assertEquals(cp2, readCp)
-    kcm.stop
-  }
-
-  @Test
-  def testUnrecoverableKafkaErrorShouldThrowKafkaCheckpointManagerException {
-    val exceptions = List("InvalidMessageException", 
"InvalidMessageSizeException", "UnknownTopicOrPartitionException")
-    exceptions.foreach { exceptionName =>
-      val kcm = getKafkaCheckpointManagerWithInvalidSerde(exceptionName)
-      val taskName = new TaskName(partition.toString)
-      kcm.register(taskName)
-      createCheckpointTopic(serdeCheckpointTopic)
-      kcm.validateTopic
-      writeCheckpoint(taskName, cp1, serdeCheckpointTopic)
-      // because serde will throw unrecoverable errors, it should result a 
KafkaCheckpointException
-      try {
-        kcm.readLastCheckpoint(taskName)
-        fail("Expected a KafkaCheckpointException.")
-      } catch {
-        case e: KafkaCheckpointException => None
-      }
-      kcm.stop
-    }
-  }
-
-  private def getKafkaCheckpointManager = new KafkaCheckpointManager(
-    clientId = "some-client-id",
-    checkpointTopic = checkpointTopic,
-    systemName = "kafka",
-    socketTimeout = 30000,
-    bufferSize = 64 * 1024,
-    fetchSize = 300 * 1024,
-    metadataStore = metadataStore,
-    connectProducer = () => new 
KafkaProducer(producerConfig.getProducerProperties),
-    connectZk = () => new ZkClient(zkConnect, 60000, 60000, 
ZKStringSerializer),
-    systemStreamPartitionGrouperFactoryString = 
systemStreamPartitionGrouperFactoryString,
-    checkpointTopicProperties = 
KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new 
MapConfig(Map[String, String]())))
-
-  // inject serde. Kafka exceptions will be thrown when serde.fromBytes is 
called
-  private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = 
new KafkaCheckpointManager(
-    clientId = "some-client-id-invalid-serde",
-    checkpointTopic = serdeCheckpointTopic,
-    systemName = "kafka",
-    socketTimeout = 30000,
-    bufferSize = 64 * 1024,
-    fetchSize = 300 * 1024,
-    metadataStore = metadataStore,
-    connectProducer = () => new 
KafkaProducer(producerConfig.getProducerProperties),
-    connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
-    systemStreamPartitionGrouperFactoryString = 
systemStreamPartitionGrouperFactoryString,
-    serde = new InvalideSerde(exception),
-    checkpointTopicProperties = 
KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new 
MapConfig(Map[String, String]())))
-
-  class InvalideSerde(exception: String) extends CheckpointSerde {
-    override def fromBytes(bytes: Array[Byte]): Checkpoint = {
-      exception match {
-        case "InvalidMessageException" => throw new InvalidMessageException
-        case "InvalidMessageSizeException" => throw new 
InvalidMessageSizeException
-        case "UnknownTopicOrPartitionException" => throw new 
UnknownTopicOrPartitionException
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala
new file mode 100644
index 0000000..c360b6c
--- /dev/null
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint.kafka
+
+import org.apache.samza.SamzaException
+import org.apache.samza.container.TaskName
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+class TestKafkaCheckpointLogKey {
+  @Before
+  def setSSPGrouperFactoryString() {
+    KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString("hello")
+  }
+
+  @Test
+  def checkpointKeySerializationRoundTrip() {
+    val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new 
TaskName("TN"))
+    val asBytes = checkpointKey.toBytes()
+    val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes)
+
+    assertEquals(checkpointKey, backFromBytes)
+    assertNotSame(checkpointKey, backFromBytes)
+  }
+
+  @Test
+  def changelogPartitionMappingKeySerializationRoundTrip() {
+    val key = KafkaCheckpointLogKey.getChangelogPartitionMappingKey()
+    val asBytes = key.toBytes()
+    val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes)
+
+    assertEquals(key, backFromBytes)
+    assertNotSame(key, backFromBytes)
+  }
+
+  @Test
+  def differingSSPGrouperFactoriesCauseException() {
+
+    val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new 
TaskName("TN"))
+
+    val asBytes = checkpointKey.toBytes()
+
+    
KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString("goodbye")
+
+    var gotException = false
+    try {
+      KafkaCheckpointLogKey.fromBytes(asBytes)
+    } catch {
+      case se:SamzaException => assertEquals(new 
DifferingSystemStreamPartitionGrouperFactoryValues("hello", 
"goodbye").getMessage(), se.getCause.getMessage)
+        gotException = true
+    }
+
+    assertTrue("Should have had an exception since ssp grouper factories 
didn't match", gotException)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
new file mode 100644
index 0000000..af4051b
--- /dev/null
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka
+
+import kafka.admin.AdminUtils
+import kafka.common.{InvalidMessageSizeException, 
UnknownTopicOrPartitionException}
+import kafka.message.InvalidMessageException
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer}
+import kafka.zk.EmbeddedZookeeper
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{KafkaProducer, Producer, 
ProducerConfig, ProducerRecord}
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.config.{KafkaProducerConfig, MapConfig}
+import org.apache.samza.container.TaskName
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.serializers.CheckpointSerde
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, 
KafkaUtilException, TopicMetadataStore}
+import org.apache.samza.{Partition, SamzaException}
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.JavaConversions._
+import scala.collection._
+
+class TestKafkaCheckpointManager {
+
+  val checkpointTopic = "checkpoint-topic"
+  val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
+  val checkpointTopicConfig = 
KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null)
+  val zkConnect: String = TestZKUtils.zookeeperConnect
+  var zkClient: ZkClient = null
+  val zkConnectionTimeout = 6000
+  val zkSessionTimeout = 6000
+
+  val brokerId1 = 0
+  val brokerId2 = 1
+  val brokerId3 = 2
+  val ports = TestUtils.choosePorts(3)
+  val (port1, port2, port3) = (ports(0), ports(1), ports(2))
+
+  val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+  props1.put("controlled.shutdown.enable", "true")
+  val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  props1.put("controlled.shutdown.enable", "true")
+  val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
+  props1.put("controlled.shutdown.enable", "true")
+
+  val config = new java.util.HashMap[String, Object]()
+  val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, 
port3)
+  config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
+  config.put("acks", "all")
+  config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+  config.put(ProducerConfig.RETRIES_CONFIG, (new 
Integer(java.lang.Integer.MAX_VALUE-1)).toString)
+  config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
+  val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+  val partition = new Partition(0)
+  val partition2 = new Partition(1)
+  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", 
partition) -> "123"))
+  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", 
partition) -> "12345"))
+  var zookeeper: EmbeddedZookeeper = null
+  var server1: KafkaServer = null
+  var server2: KafkaServer = null
+  var server3: KafkaServer = null
+  var metadataStore: TopicMetadataStore = null
+
+  val systemStreamPartitionGrouperFactoryString = 
classOf[GroupByPartitionFactory].getCanonicalName
+
+  @Before
+  def beforeSetupServers {
+    zookeeper = new EmbeddedZookeeper(zkConnect)
+    server1 = TestUtils.createServer(new KafkaConfig(props1))
+    server2 = TestUtils.createServer(new KafkaConfig(props2))
+    server3 = TestUtils.createServer(new KafkaConfig(props3))
+    metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
+  }
+
+  @After
+  def afterCleanLogDirs {
+    server1.shutdown
+    server1.awaitShutdown()
+    server2.shutdown
+    server2.awaitShutdown()
+    server3.shutdown
+    server3.awaitShutdown()
+    Utils.rm(server1.config.logDirs)
+    Utils.rm(server2.config.logDirs)
+    Utils.rm(server3.config.logDirs)
+    zookeeper.shutdown
+  }
+
+  private def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint, 
cpTopic: String = checkpointTopic) = {
+    val producer: Producer[Array[Byte], Array[Byte]] = new 
KafkaProducer(producerConfig.getProducerProperties)
+    val record = new ProducerRecord(
+      cpTopic,
+      0,
+      KafkaCheckpointLogKey.getCheckpointKey(taskName).toBytes(),
+      new CheckpointSerde().toBytes(checkpoint)
+    )
+    try {
+      producer.send(record).get()
+    } catch {
+      case e: Exception => println(e.getMessage)
+    } finally {
+      producer.close()
+    }
+  }
+
+
+  private def createCheckpointTopic(cpTopic: String = checkpointTopic) = {
+    val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+    try {
+      AdminUtils.createTopic(
+        zkClient,
+        checkpointTopic,
+        1,
+        1,
+        checkpointTopicConfig)
+    } catch {
+      case e: Exception => println(e.getMessage)
+    } finally {
+      zkClient.close
+    }
+  }
+
+  @Test
+  def 
testCheckpointShouldBeNullIfCheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite
 {
+    val kcm = getKafkaCheckpointManager
+    val taskName = new TaskName(partition.toString)
+    kcm.register(taskName)
+    createCheckpointTopic()
+    kcm.kafkaUtil.validateTopicPartitionCount(checkpointTopic, "kafka", 
metadataStore, 1)
+
+    // check that log compaction is enabled.
+    val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+    val topicConfig = AdminUtils.fetchTopicConfig(zkClient, checkpointTopic)
+    zkClient.close
+    assertEquals("compact", topicConfig.get("cleanup.policy"))
+    assertEquals("26214400", topicConfig.get("segment.bytes"))
+
+    // read before topic exists should result in a null checkpoint
+    var readCp = kcm.readLastCheckpoint(taskName)
+    assertNull(readCp)
+
+    // create topic the first time around
+    writeCheckpoint(taskName, cp1)
+    readCp = kcm.readLastCheckpoint(taskName)
+    assertEquals(cp1, readCp)
+
+    // should get an exception if partition doesn't exist
+    try {
+      readCp = kcm.readLastCheckpoint(new TaskName(new Partition(1).toString))
+      fail("Expected a SamzaException, since only one partition (partition 0) 
should exist.")
+    } catch {
+      case e: SamzaException => None // expected
+      case _: Exception => fail("Expected a SamzaException, since only one 
partition (partition 0) should exist.")
+    }
+
+    // writing a second message should work, too
+    writeCheckpoint(taskName, cp2)
+    readCp = kcm.readLastCheckpoint(taskName)
+    assertEquals(cp2, readCp)
+    kcm.stop
+  }
+
+  @Test
+  def testUnrecoverableKafkaErrorShouldThrowKafkaCheckpointManagerException {
+    val exceptions = List("InvalidMessageException", 
"InvalidMessageSizeException", "UnknownTopicOrPartitionException")
+    exceptions.foreach { exceptionName =>
+      val kcm = getKafkaCheckpointManagerWithInvalidSerde(exceptionName)
+      val taskName = new TaskName(partition.toString)
+      kcm.register(taskName)
+      createCheckpointTopic(serdeCheckpointTopic)
+      kcm.kafkaUtil.validateTopicPartitionCount(serdeCheckpointTopic, "kafka", 
metadataStore, 1)
+      writeCheckpoint(taskName, cp1, serdeCheckpointTopic)
+      // because serde will throw unrecoverable errors, it should result a 
KafkaCheckpointException
+      try {
+        kcm.readLastCheckpoint(taskName)
+        fail("Expected a KafkaUtilException.")
+      } catch {
+        case e: KafkaUtilException => None
+      }
+      kcm.stop
+    }
+  }
+
+  private def getKafkaCheckpointManager = new KafkaCheckpointManager(
+    clientId = "some-client-id",
+    checkpointTopic = checkpointTopic,
+    systemName = "kafka",
+    replicationFactor = 3,
+    socketTimeout = 30000,
+    bufferSize = 64 * 1024,
+    fetchSize = 300 * 1024,
+    metadataStore = metadataStore,
+    connectProducer = () => new 
KafkaProducer(producerConfig.getProducerProperties),
+    connectZk = () => new ZkClient(zkConnect, 60000, 60000, 
ZKStringSerializer),
+    systemStreamPartitionGrouperFactoryString = 
systemStreamPartitionGrouperFactoryString,
+    checkpointTopicProperties = 
KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new 
MapConfig(Map[String, String]())))
+
+  // inject serde. Kafka exceptions will be thrown when serde.fromBytes is 
called
+  private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = 
new KafkaCheckpointManager(
+    clientId = "some-client-id-invalid-serde",
+    checkpointTopic = serdeCheckpointTopic,
+    systemName = "kafka",
+    replicationFactor = 3,
+    socketTimeout = 30000,
+    bufferSize = 64 * 1024,
+    fetchSize = 300 * 1024,
+    metadataStore = metadataStore,
+    connectProducer = () => new 
KafkaProducer(producerConfig.getProducerProperties),
+    connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
+    systemStreamPartitionGrouperFactoryString = 
systemStreamPartitionGrouperFactoryString,
+    serde = new InvalideSerde(exception),
+    checkpointTopicProperties = 
KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new 
MapConfig(Map[String, String]())))
+
+  class InvalideSerde(exception: String) extends CheckpointSerde {
+    override def fromBytes(bytes: Array[Byte]): Checkpoint = {
+      exception match {
+        case "InvalidMessageException" => throw new InvalidMessageException
+        case "InvalidMessageSizeException" => throw new 
InvalidMessageSizeException
+        case "UnknownTopicOrPartitionException" => throw new 
UnknownTopicOrPartitionException
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
new file mode 100644
index 0000000..504fc89
--- /dev/null
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.migration
+
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{TestUtils, TestZKUtils, Utils}
+import kafka.zk.EmbeddedZookeeper
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{KafkaProducer, Producer, 
ProducerConfig, ProducerRecord}
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.checkpoint.kafka.{KafkaCheckpointManager, 
KafkaCheckpointLogKey, KafkaCheckpointManagerFactory}
+import org.apache.samza.config._
+import org.apache.samza.container.TaskName
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.coordinator.MockSystemFactory
+import org.apache.samza.coordinator.stream.messages.SetMigrationMetaMessage
+import org.apache.samza.coordinator.stream._
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.serializers.CheckpointSerde
+import org.apache.samza.storage.ChangelogPartitionManager
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.util._
+import org.apache.samza.Partition
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.JavaConversions._
+import scala.collection._
+
+class TestKafkaCheckpointMigration {
+
+  val checkpointTopic = "checkpoint-topic"
+  val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
+  val checkpointTopicConfig = 
KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null)
+  val zkConnect: String = TestZKUtils.zookeeperConnect
+  var zkClient: ZkClient = null
+  val zkConnectionTimeout = 6000
+  val zkSessionTimeout = 6000
+
+  val brokerId1 = 0
+  val brokerId2 = 1
+  val brokerId3 = 2
+  val ports = TestUtils.choosePorts(3)
+  val (port1, port2, port3) = (ports(0), ports(1), ports(2))
+
+  val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+  props1.put("controlled.shutdown.enable", "true")
+  val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  props1.put("controlled.shutdown.enable", "true")
+  val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
+  props1.put("controlled.shutdown.enable", "true")
+
+  val config = new java.util.HashMap[String, Object]()
+  val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, 
port3)
+  config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
+  config.put("acks", "all")
+  config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+  config.put(ProducerConfig.RETRIES_CONFIG, (new 
Integer(java.lang.Integer.MAX_VALUE-1)).toString)
+  config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
+  val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+  val partition = new Partition(0)
+  val partition2 = new Partition(1)
+  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", 
partition) -> "123"))
+  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", 
partition) -> "12345"))
+  var zookeeper: EmbeddedZookeeper = null
+  var server1: KafkaServer = null
+  var server2: KafkaServer = null
+  var server3: KafkaServer = null
+  var metadataStore: TopicMetadataStore = null
+
+  val systemStreamPartitionGrouperFactoryString = 
classOf[GroupByPartitionFactory].getCanonicalName
+
+  @Before
+  def beforeSetupServers {
+    zookeeper = new EmbeddedZookeeper(zkConnect)
+    server1 = TestUtils.createServer(new KafkaConfig(props1))
+    server2 = TestUtils.createServer(new KafkaConfig(props2))
+    server3 = TestUtils.createServer(new KafkaConfig(props3))
+    metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
+  }
+
+  @After
+  def afterCleanLogDirs {
+    server1.shutdown
+    server1.awaitShutdown()
+    server2.shutdown
+    server2.awaitShutdown()
+    server3.shutdown
+    server3.awaitShutdown()
+    Utils.rm(server1.config.logDirs)
+    Utils.rm(server2.config.logDirs)
+    Utils.rm(server3.config.logDirs)
+    zookeeper.shutdown
+  }
+
+  private def writeChangeLogPartitionMapping(changelogMapping: Map[TaskName, 
Integer], cpTopic: String = checkpointTopic) = {
+    val producer: Producer[Array[Byte], Array[Byte]] = new 
KafkaProducer(producerConfig.getProducerProperties)
+    val record = new ProducerRecord(
+      cpTopic,
+      0,
+      KafkaCheckpointLogKey.getChangelogPartitionMappingKey().toBytes(),
+      new CheckpointSerde().changelogPartitionMappingToBytes(changelogMapping)
+    )
+    try {
+      producer.send(record).get()
+    } catch {
+      case e: Exception => println(e.getMessage)
+    } finally {
+      producer.close()
+    }
+  }
+
+  @Test
+  def testMigrationWithNoCheckpointTopic() {
+    val mapConfig = Map[String, String](
+      "task.checkpoint.factory" -> 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
+      JobConfig.JOB_NAME -> "test",
+      JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
+      JobConfig.JOB_CONTAINER_COUNT -> "2",
+      "task.inputs" -> "test.stream1",
+      "task.checkpoint.system" -> "test",
+      SystemConfig.SYSTEM_FACTORY.format("test") -> 
classOf[MockSystemFactory].getCanonicalName,
+      "systems.test.producer.bootstrap.servers" -> brokers,
+      "systems.test.consumer.zookeeper.connect" -> zkConnect,
+      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> 
classOf[MockCoordinatorStreamSystemFactory].getName)
+
+    // Enable consumer caching
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+    val config: MapConfig = new MapConfig(mapConfig)
+    val migrate = new KafkaCheckpointMigration
+    migrate.migrate(config)
+    val consumer = new 
CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, new 
NoOpMetricsRegistry)
+    consumer.register()
+    consumer.start()
+    consumer.bootstrap()
+    val bootstrappedStream = 
consumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE)
+    assertEquals(1, bootstrappedStream.size())
+
+    val expectedMigrationMessage = new 
SetMigrationMetaMessage("CHECKPOINTMIGRATION", "CheckpointMigration09to10", 
"true")
+    assertEquals(expectedMigrationMessage, bootstrappedStream.head)
+    consumer.stop()
+  }
+
+  @Test
+  def testMigration() {
+    try {
+      val mapConfig = Map(
+        "task.checkpoint.factory" -> 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
+        JobConfig.JOB_NAME -> "test",
+        JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
+        JobConfig.JOB_CONTAINER_COUNT -> "2",
+        "task.inputs" -> "test.stream1",
+        "task.checkpoint.system" -> "test",
+        "systems.test.producer.bootstrap.servers" -> brokers,
+        "systems.test.consumer.zookeeper.connect" -> zkConnect,
+        SystemConfig.SYSTEM_FACTORY.format("test") -> 
classOf[MockSystemFactory].getCanonicalName,
+        SystemConfig.SYSTEM_FACTORY.format("coordinator") -> 
classOf[MockCoordinatorStreamSystemFactory].getName)
+      // Enable consumer caching
+      MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+      val config = new MapConfig(mapConfig)
+      val checkpointTopicName = KafkaUtil.getCheckpointTopic("test", "1")
+      val checkpointManager = new 
KafkaCheckpointManagerFactory().getCheckpointManager(config, new 
NoOpMetricsRegistry).asInstanceOf[KafkaCheckpointManager]
+
+      // Write a couple of checkpoints in the old checkpoint topic
+      val task1 = new TaskName(partition.toString)
+      val task2 = new TaskName(partition2.toString)
+      checkpointManager.start
+      checkpointManager.register(task1)
+      checkpointManager.register(task2)
+      checkpointManager.writeCheckpoint(task1, cp1)
+      checkpointManager.writeCheckpoint(task2, cp2)
+
+      // Write changelog partition info to the old checkpoint topic
+      val changelogMapping = Map(task1 -> 1.asInstanceOf[Integer], task2 -> 
10.asInstanceOf[Integer])
+      writeChangeLogPartitionMapping(changelogMapping, checkpointTopicName)
+      checkpointManager.stop
+
+      // Initialize coordinator stream
+      val coordinatorFactory = new CoordinatorStreamSystemFactory()
+      val coordinatorSystemConsumer = 
coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new 
MetricsRegistryMap)
+      val coordinatorSystemProducer = 
coordinatorFactory.getCoordinatorStreamSystemProducer(config, new 
MetricsRegistryMap)
+      coordinatorSystemConsumer.register()
+      coordinatorSystemConsumer.start()
+
+      
assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size,
 0)
+      coordinatorSystemConsumer.stop
+
+      // Start the migration
+      val migrationInstance = new KafkaCheckpointMigration
+      migrationInstance.migrate(config)
+
+      // Verify if the changelogPartitionInfo has been migrated
+      val newChangelogManager = new 
ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, 
"test")
+      newChangelogManager.start
+      val newChangelogMapping = 
newChangelogManager.readChangeLogPartitionMapping()
+      newChangelogManager.stop
+      assertEquals(newChangelogMapping.toMap, changelogMapping)
+
+      // Check for migration message
+      coordinatorSystemConsumer.register()
+      coordinatorSystemConsumer.start()
+      
assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size,
 1)
+      coordinatorSystemConsumer.stop()
+    }
+    finally {
+      MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
+    }
+  }
+
+  class MockKafkaCheckpointMigration extends KafkaCheckpointMigration{
+    var migrationCompletionMarkFlag: Boolean = false
+    var migrationVerificationMarkFlag: Boolean = false
+
+    override def migrationCompletionMark(coordinatorStreamProducer: 
CoordinatorStreamSystemProducer) = {
+      migrationCompletionMarkFlag = true
+      super.migrationCompletionMark(coordinatorStreamProducer)
+    }
+
+    override def migrationVerification(coordinatorStreamConsumer: 
CoordinatorStreamSystemConsumer): Boolean = {
+      migrationVerificationMarkFlag = true
+      super.migrationVerification(coordinatorStreamConsumer)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
index b20e351..e2b45d7 100644
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
+++ 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
@@ -87,7 +87,7 @@ public class TestContainerAllocator {
       containers.put(i, container);
     }
     JobModel jobModel = new JobModel(config, containers);
-    return new JobCoordinator(jobModel, server, null);
+    return new JobCoordinator(jobModel, server);
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
index 08e53aa..269d824 100644
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
+++ 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
@@ -98,7 +98,7 @@ public class TestHostAwareContainerAllocator {
       containers.put(i, container);
     }
     JobModel jobModel = new JobModel(getConfig(), containers);
-    return new JobCoordinator(jobModel, server, null);
+    return new JobCoordinator(jobModel, server);
   }
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
index b12ae5c..88d9f24 100644
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
+++ 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
@@ -109,7 +109,7 @@ public class TestSamzaTaskManager {
     when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
 
     JobModel jobModel = new JobModel(getConfig(), containers, 
mockLocalityManager);
-    return new JobCoordinator(jobModel, server, null);
+    return new JobCoordinator(jobModel, server);
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
index ec5a853..30cf34f 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
@@ -37,7 +37,7 @@ import java.net.URL
 import org.apache.samza.coordinator.JobCoordinator
 
 class TestSamzaAppMasterLifecycle {
-  val coordinator = new JobCoordinator(null, null, null)
+  val coordinator = new JobCoordinator(null, null)
   val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, 
Mockito.mock(classOf[CallbackHandler])) {
     var host = ""
     var port = 0

Reply via email to