-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32147/#review77063
-----------------------------------------------------------


* Comb over all docs and update checkpoint manager docs. 
* Add some documentation to docs directory for coordinator stream. This is a 
new component, and fairly critical to how Samza works. We should document it 
clearly.
* Patch doesn't seem to apply against master.
* Will need a follow-on ticket to write a topic migrator, so we can migrate 
0.8.0 checkpoint topics to 0.9.0. This should be done automatically, without 
any human intervention.
* It seems like JsonSerde is deleted. Was this intentional?


samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124887>

    Does Checkpoint need to be in the samza-api package? Seems like we should 
just move it to core to hide it from users, since they don't need it.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124888>

    I'm confused. getMessageMap always returns a message with no values?



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124894>

    I think the source is usually the container ID (unless a CLI is used).



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124895>

    Might want to provide an explicit example of SSP to show how it's 
serialized (as system.stream.partition .. kafka.AdViewEvent.0)



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124891>

    I think that there might be enough messages (config, checkpoint, changelog) 
to warrnat distinct class files for each one. Probably in a package like 
org.apache.samza.coordinator.stream.messages



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124896>

    Will this work? If getMessageMap is always returning a map with no values...



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124899>

    Nit: SetChangelog -> SetChangelogMapping



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124903>

    Same source comment as above.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment124904>

    Constant for "Partition", like set-changelog
    
    Nit: lowercase "partition" for consistency with other message constants.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
<https://reviews.apache.org/r/32147/#comment124907>

    We don't care about ordering for anything right now, right?



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
<https://reviews.apache.org/r/32147/#comment124905>

    Nit: single-line if() {



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
<https://reviews.apache.org/r/32147/#comment124906>

    Probably debug here, otherwise it'll be a bit verbose, I think.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
<https://reviews.apache.org/r/32147/#comment124909>

    Javadocs.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
<https://reviews.apache.org/r/32147/#comment124908>

    nit: if() {
      ...
    }



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
<https://reviews.apache.org/r/32147/#comment124910>

    nit: if() {
      ...
    }



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/32147/#comment124947>

    I think this can be deleted.



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/32147/#comment124946>

    protected



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/32147/#comment124948>

    Map[,]()
    
    with `import scala.collection.JavaConversions._`



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
<https://reviews.apache.org/r/32147/#comment124967>

    Should not be static objects. Create in apply, and pass around as needed. 
Given how simple getCheckpointManager and getChangelogManager are, I think 
those methods should be deleted, and everything should just be done in apply()



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
<https://reviews.apache.org/r/32147/#comment124969>

    Do you need to register taskNames here? Seems like kind of a 
chicken-and-egg problem. I think taskNames aren't used in changelogManager.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
<https://reviews.apache.org/r/32147/#comment124974>

    Can we do:
    
        val offsetMap = systemStreamPartition
          .map(ssp => (ssp -> null))
          .toMap ++ checkpoint.getOffsets



samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
<https://reviews.apache.org/r/32147/#comment124976>

    This is really weird. JsonSerde with TestJsonSerde class in it? Is RB 
messed up?



samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
<https://reviews.apache.org/r/32147/#comment124983>

    See comment in CheckpointTool about making CheckpointManager injectable.



samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
<https://reviews.apache.org/r/32147/#comment124984>

    See comment in CheckpointTool about making CheckpointManager injectable.



samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
<https://reviews.apache.org/r/32147/#comment124980>

    I remember that we talked about this, but why are we deleting this rather 
than fixing again?



samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
<https://reviews.apache.org/r/32147/#comment124985>

    nit: double new line



samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
<https://reviews.apache.org/r/32147/#comment124986>

    nit: = {
    on single line



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
<https://reviews.apache.org/r/32147/#comment124987>

    I love to see a plan come together. :D



samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
<https://reviews.apache.org/r/32147/#comment124868>

    Waiting for an hour seems a bit excessive. Was there any reason this was 
changed?



samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
<https://reviews.apache.org/r/32147/#comment124859>

    Move below the apache docs.



samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
<https://reviews.apache.org/r/32147/#comment124860>

    Does this class do anything?



samza-api/src/main/java/org/apache/samza/container/TaskName.java
<https://reviews.apache.org/r/32147/#comment124861>

    Could you add an explanation as to why it should only contain the task name?



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124880>

    Can we put this in org.apache.samza.storae?



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124877>

    A slightly better name for this is probably ChangelogPartitionManager, or 
something like that. I worry that ChangelogManager is too close to 
TaskStorageManager.



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124886>

    Never used. Did you mean to validate like you do in CheckpointManager?



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124871>

    Checkpoints? This is the changelog manager.



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124873>

    Same as above.



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124874>

    Is calling register() on the coordinatorStreamConsumer multiple times OK? 
Since it takes no param, it seems like you'd only need to call it once, or not 
at all (could just be invoked interally when start() is called).



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124875>

    Seems like this logic should get folded into the CoordinatorStreamConsumer. 
CheckpointManager and ChangelogManager both seem to have to nearly the same 
logic.



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124878>

    Nit: might want to import CoordinatorStreamMessage.SetChangelogMapping, 
just to make things a bit more succinct.



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment124879>

    "changelogEntries", since each row results in a new message.



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment124882>

    Given how similar this class is to ChangelogManager, does it make sense to 
just have one CoordinatorStreamManager that has read/write methods for all of 
the CoordinatStreamMessage types? Seems like it would allow us to delete a fair 
amount of code.
    
    Or, perhaps we should write a base class, and extend from it on a 
per-message-type basis?



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment124863>

    Javadocs.



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment124881>

    Seems like a dupe of what's in ChangelogManager. Think this logic should be 
moved to CoordinatorStreamConsumer, and used in both places. Or maybe moved to 
a shared base class.



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment124885>

    else we should probably do something nasty like throw an exception.



samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
<https://reviews.apache.org/r/32147/#comment124934>

    Did IntelliJ generate this (and hashCode)?



samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
<https://reviews.apache.org/r/32147/#comment124943>

    Is this in Util, or does any other code do something similar? Seems like it 
belongs in Util.



samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
<https://reviews.apache.org/r/32147/#comment124944>

    Same Util question as above.



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/32147/#comment124982>

    Can you make CheckpointTool take a CheckpointManager param, and add an 
apply() method to the companion object, which builds a CheckpointTool object 
given config and newOffsets? That way you can inject CheckpointManagers for 
unit testing rather than having the setCheckpointManager method below.



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
<https://reviews.apache.org/r/32147/#comment124949>

    filter(_._2 != null)



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/32147/#comment124965>

    nit: I think we've been using "Checkpoint" not "CheckPoint" in most places.



samza-test/src/main/config/join/common.properties
<https://reviews.apache.org/r/32147/#comment124989>

    Should this be kafka-checkpoints? Also, should we rename that system? 
also^2, do we need two systems anymore?



samza-test/src/main/config/negate-number.properties
<https://reviews.apache.org/r/32147/#comment124988>

    Is this required? There's only one system defined.



samza-test/src/main/config/perf/container-performance.properties
<https://reviews.apache.org/r/32147/#comment124990>

    Does this matter? Should coordinator stream always start from the oldest 
offset?



samza-test/src/main/python/samza_failure_testing.py
<https://reviews.apache.org/r/32147/#comment124869>

    Convention is usually #!/.... with no space.
    
    Also, `#!/usr/bin/env python` is usually the preferred location, I think 
(space after env is intentional). See:
    
    
http://stackoverflow.com/questions/2429511/why-do-people-write-usr-bin-env-python-on-the-first-line-of-a-python-script


- Chris Riccomini


On March 17, 2015, 11:25 p.m., Naveen Somasundaram wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32147/
> -----------------------------------------------------------
> 
> (Updated March 17, 2015, 11:25 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA 465
> 
> 
> Diffs
> -----
> 
>   build.gradle 0a268ac7a3819cf46b54a93e0e3171455371456a 
>   samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java 
> 593d11872430100e000c7d4b6edc5ef29649d8d4 
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> 092cb910b40d312217e86420bf1ddfbaf605e9e5 
>   
> samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
>  a97ff0919d8205928efee1a2a20780659180849d 
>   samza-api/src/main/java/org/apache/samza/container/TaskName.java 
> 083358686fc69ab45bbc73e898f419224ebc3a9f 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 8995ba30c823bddcdfd3af7100e1440e71ef7998 
>   samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java 
> 6ff1a555f3c48e416bb78e94c5df71eff0a27f3d 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  01997ae22641b735cd452a0e89a49219e2874892 
>   samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java 
> eb22d2ec5f09ca59790e2871d9bff9745fe925dc 
>   
> samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
>  7dc431c74a3fc2ba80eb47d6c5d87524cb4c9bde 
>   
> samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
>  3517912eaafbf95f8c8cc70ab5869548a56b76e7 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
> ddc30af7c52d8a4d5c5de02f6757c040b1f31c93 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> a40c87fa7865746a5612c55a4cf24c8d005be7e0 
>   
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
>  2a87a6e0cef72179b5383fc824266de1f9606293 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 3b6685e00837a4aaf809813e62b7e52823bc07a9 
>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
> 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02 
>   samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala 
> adef09e15c666cb2dbb2e4c5507fc2d605b82a1e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 1ca9e2cc5673c537b6a48224809847e94da81fca 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 275eb1a924d09a0a43efe6273e0d2af9217e1c74 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> c14f2f623bb4bae911dd3085ce428175930e4545 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
>  10986a49b39cda703a0e54688dc914f2465c79c9 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
>  635c3531a897a369c813821f7b901186e1281ed1 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
> 0b720ec4dd83c71fd1ce5071571c7a10babf0ddc 
>   
> samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
> b80d34953a54ada461ed1d4b0dcfa07f4435f877 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
> 530255e5866bc49ec5ce1a0b7437470cd4e17010 
>   samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala 
> 744eec05857a4ea14c718e3750fb575d3678b1f8 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  ec1d749cb5186f788b402877996a4caa37e99362 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> 1a67586eeec95dabfeb3b6881af9b3865c3029ca 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
>  76bc6810a3162a1dc58a36033b3b1f75616bd6ca 
>   samza-core/src/test/resources/test.properties 
> 9348c7de956ebf02f58a163dc6fb391a7e29ae64 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
>  af800dfeedbfea75abaac3f15fd53bc55b743daf 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> d18d4c45c5de3b50a24d6c776364e1f589db8f4d 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
>  10ff1f437220b38810f8a32903cc72df20f206d3 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 81742bc93bcbca5a2ed43c701bcd5d3f40d79bfe 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
>  1ee5c06fbd1be5e4ce944a16454c8bd32459d395 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  a8e5d36921464a2e36693279e8083e4544c4e289 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
>  9a8e1450a6cc14713817f719cfa56a0e5c97a6f6 
>   samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala 
> 60460713a2d4f7b7b389f21c1450d45c1afaa0f4 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
>  5d8ee4fa74be9ba36956c11ae33573be2d2d5826 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  4a1b31f025ba7b05a7b46041aa8e12074599ce24 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  7fc6d89b5d703a7c10a212aaa8d3f9231996b897 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 02a6ea94daf9eb597c9ecef5d63062df5861efc8 
>   
> samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 
> a34c3f2738855dbf3737639c33846fcad23bd3b9 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  b790be17cfe08da28220ffb381cbd618ebe25cf0 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  4f15002325bc0154991f9a35312e903d15ef81e7 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
>  b76d5ad68640908bef552125d405b467386025f8 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  7d4bea8398794c2325f9c022074303a83cfb164a 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  3d1e6ecbb3fd95816c722a68c4f5907120eb20d0 
>   
> samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
> 4ef3551f470e77e27bd156e81ce96486f25c21bf 
>   samza-test/src/main/config/join/common.properties 
> ad10aac090beb072ecce3546f06279a7a6113970 
>   samza-test/src/main/config/negate-number.properties 
> b9f898c745250252461c833adb05e24ece2d4a89 
>   samza-test/src/main/config/perf/container-performance.properties 
> 86b2d58e2d4a9d6bcc9bd305bb7d735f7d1dd053 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> c0a20af5a2f4329ad4a2cff378ced3bececbc1cb 
>   samza-test/src/main/python/samza_failure_testing.py 
> 198db26528cab8b473f794a922848a60299dc825 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
>  45c76e86361b9e1a54cb5fc717126d36b64cf7e8 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  a8b724bf781003142e455fdf1fed2f13d6c18353 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 
> 03395e2efa0fec723e354177d06bfacf7d2a9215 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
> a1dbe0435ae08c710d4bfc871458ed386e275cd2 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
>  b0b6543856cb87888c5a719182ad9576b51bba1a 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
> 24b11da06a69da734c85720ef39d65ee46d821d5 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
>  765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
>  81dea9d6d1921462b200c62dbdf016c0eb2f01b2 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  58f2464211a1fb7ff40f5978fd41f64d088002d0 
> 
> Diff: https://reviews.apache.org/r/32147/diff/
> 
> 
> Testing
> -------
> 
> The patch does the following:
> 1. Removes all checkpointmanagers specific to the system and replaces them to 
> a single entity that uses coordinator stream to do the checkpointing
> 2. Decoupled change log from checkpoint manager, a new component called 
> changelog manager will do that.
> 3. Passes the checkpoint information to the containers from the jobcoordinator
> 4. Modifies the offsetmanger to use the new checkpoint manager and starting 
> offsets from jobcoordinator.
> 
> 
> Tests:
> All existing Unit tests and Zopkio tests pass, I have changed some of 
> existing Unit tests as well.
> Plan to add one more Unit test to verify checkpoint persistence (Stub present 
> in TestJobCoordinator)
> 
> Pending Issues:
> 1. The metrics registry is right now not used correctly, I need to pass the 
> right reference to it
> 2. The coordinator stream does not use the same consumer and producer from 
> the "systemconsumers" and "systemproducers" we have (This will be done after 
> SAMZA-567)
> 3. Remove checkpoint configs related to the stream, migrate them to the 
> coordinator stream and document them.
> 
> 
> Thanks,
> 
> Naveen Somasundaram
> 
>

Reply via email to