-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32147/#review77607
-----------------------------------------------------------



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment125741>

    It's not in the api, it's in core. Lmk, if you see something that indicates 
otherwise.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment125742>

    This is must have been from auto merge on the final rebase, I went over the 
diff once, but had to rebase again. I will review the code over again.



samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
<https://reviews.apache.org/r/32147/#comment125836>

    done!



samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
<https://reviews.apache.org/r/32147/#comment125839>

    I don't think so, I'll delete and see if anything needs it.



samza-api/src/main/java/org/apache/samza/container/TaskName.java
<https://reviews.apache.org/r/32147/#comment125837>

    sure



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment125845>

    done



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment125846>

    Agreed, it makes look like it's managing all of changelog. I'll refactor to 
ChangelogPartitionManager, it makes more sense when it really it's just doing 
that :)



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment126012>

    I am going to remove it, it doesn't make sense to have task name here, 
since we always reading all mappings.



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment126017>

    fixed



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment126018>

    fixed



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment126021>

    I was trying not to break the semantics of "register only calling 
register." It keeps reading the meta inforamtion on each call, which is not the 
best performance. Maybe I'll just set a boolean to not re-register the consumer



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment126041>

    Added a new method in coordinatorstreamconsumer that will accept types and 
return filtered messages.



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment126042>

    Changed



samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java
<https://reviews.apache.org/r/32147/#comment126043>

    fixed



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment126050>

    After moving the bootstrap to the CoordinatorStreamConsumer, the only 
common entities are start, stop and register. It more looks like a slightly 
bulky version of your latter suggestion. 
    
    At this point, I am in favor of revisiting this when I do the host affinity 
mapping (for state reuse) - that I'll be working on after this. I'll see what 
the pattern is and move it to a base class.



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment126052>

    done



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment126053>

    moved to the consumer



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment126057>

    This contains is to filter out checkpoints the checkpointmanager are 
interested in (Through register calls). If there are checkpoints which we have 
not registered for, it would enter the else condition, in which case, we don't 
do anything. Let me know if you see any other problem with it.



samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
<https://reviews.apache.org/r/32147/#comment126058>

    Correct



samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
<https://reviews.apache.org/r/32147/#comment126166>

    You are right, I had written very similar code in Util. I had to dupe it 
because of scala, java issue. I am changing it to use the Util, now that it has 
changed.



samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
<https://reviews.apache.org/r/32147/#comment126167>

    fixed



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/32147/#comment126169>

    That's a better idea, exposing that new method just for testing was really 
nagging me :)



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
<https://reviews.apache.org/r/32147/#comment126170>

    changed



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/32147/#comment126172>

    Correct, changed.



samza-test/src/main/config/join/common.properties
<https://reviews.apache.org/r/32147/#comment126522>

    correct, made it one system



samza-test/src/main/config/negate-number.properties
<https://reviews.apache.org/r/32147/#comment126174>

    Removed, shouldn't be necessary



samza-test/src/main/config/perf/container-performance.properties
<https://reviews.apache.org/r/32147/#comment126533>

    It should, but the code does that automatically, so this is redundant, I'll 
remove it from the config.



samza-test/src/main/python/samza_failure_testing.py
<https://reviews.apache.org/r/32147/#comment126559>

    changed


- Naveen Somasundaram


On March 17, 2015, 11:25 p.m., Naveen Somasundaram wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32147/
> -----------------------------------------------------------
> 
> (Updated March 17, 2015, 11:25 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA 465
> 
> 
> Diffs
> -----
> 
>   build.gradle 0a268ac7a3819cf46b54a93e0e3171455371456a 
>   samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java  
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> 092cb910b40d312217e86420bf1ddfbaf605e9e5 
>   
> samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
>  a97ff0919d8205928efee1a2a20780659180849d 
>   samza-api/src/main/java/org/apache/samza/container/TaskName.java 
> 083358686fc69ab45bbc73e898f419224ebc3a9f 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 8995ba30c823bddcdfd3af7100e1440e71ef7998 
>   samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java 
> 6ff1a555f3c48e416bb78e94c5df71eff0a27f3d 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  01997ae22641b735cd452a0e89a49219e2874892 
>   samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java 
> eb22d2ec5f09ca59790e2871d9bff9745fe925dc 
>   
> samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
>  7dc431c74a3fc2ba80eb47d6c5d87524cb4c9bde 
>   
> samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
>  3517912eaafbf95f8c8cc70ab5869548a56b76e7 
>   
> samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
> ddc30af7c52d8a4d5c5de02f6757c040b1f31c93 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> a40c87fa7865746a5612c55a4cf24c8d005be7e0 
>   
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
>  2a87a6e0cef72179b5383fc824266de1f9606293 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 3b6685e00837a4aaf809813e62b7e52823bc07a9 
>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
> 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02 
>   samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala 
> adef09e15c666cb2dbb2e4c5507fc2d605b82a1e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 1ca9e2cc5673c537b6a48224809847e94da81fca 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 275eb1a924d09a0a43efe6273e0d2af9217e1c74 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> c14f2f623bb4bae911dd3085ce428175930e4545 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
>  10986a49b39cda703a0e54688dc914f2465c79c9 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
>  635c3531a897a369c813821f7b901186e1281ed1 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
> 0b720ec4dd83c71fd1ce5071571c7a10babf0ddc 
>   
> samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
> b80d34953a54ada461ed1d4b0dcfa07f4435f877 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
> 530255e5866bc49ec5ce1a0b7437470cd4e17010 
>   samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala 
> 744eec05857a4ea14c718e3750fb575d3678b1f8 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  ec1d749cb5186f788b402877996a4caa37e99362 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> 1a67586eeec95dabfeb3b6881af9b3865c3029ca 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
>  76bc6810a3162a1dc58a36033b3b1f75616bd6ca 
>   samza-core/src/test/resources/test.properties 
> 9348c7de956ebf02f58a163dc6fb391a7e29ae64 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
>  af800dfeedbfea75abaac3f15fd53bc55b743daf 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> d18d4c45c5de3b50a24d6c776364e1f589db8f4d 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
>  10ff1f437220b38810f8a32903cc72df20f206d3 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 81742bc93bcbca5a2ed43c701bcd5d3f40d79bfe 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
>  1ee5c06fbd1be5e4ce944a16454c8bd32459d395 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  a8e5d36921464a2e36693279e8083e4544c4e289 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
>  9a8e1450a6cc14713817f719cfa56a0e5c97a6f6 
>   samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala 
> 60460713a2d4f7b7b389f21c1450d45c1afaa0f4 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
>  5d8ee4fa74be9ba36956c11ae33573be2d2d5826 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  4a1b31f025ba7b05a7b46041aa8e12074599ce24 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  7fc6d89b5d703a7c10a212aaa8d3f9231996b897 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 02a6ea94daf9eb597c9ecef5d63062df5861efc8 
>   
> samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 
> a34c3f2738855dbf3737639c33846fcad23bd3b9 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  b790be17cfe08da28220ffb381cbd618ebe25cf0 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  4f15002325bc0154991f9a35312e903d15ef81e7 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
>  b76d5ad68640908bef552125d405b467386025f8 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  7d4bea8398794c2325f9c022074303a83cfb164a 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  3d1e6ecbb3fd95816c722a68c4f5907120eb20d0 
>   
> samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
> 4ef3551f470e77e27bd156e81ce96486f25c21bf 
>   samza-test/src/main/config/join/common.properties 
> ad10aac090beb072ecce3546f06279a7a6113970 
>   samza-test/src/main/config/perf/container-performance.properties 
> 86b2d58e2d4a9d6bcc9bd305bb7d735f7d1dd053 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> c0a20af5a2f4329ad4a2cff378ced3bececbc1cb 
>   samza-test/src/main/python/samza_failure_testing.py 
> 198db26528cab8b473f794a922848a60299dc825 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
>  45c76e86361b9e1a54cb5fc717126d36b64cf7e8 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  a8b724bf781003142e455fdf1fed2f13d6c18353 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 
> 03395e2efa0fec723e354177d06bfacf7d2a9215 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
> a1dbe0435ae08c710d4bfc871458ed386e275cd2 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
>  b0b6543856cb87888c5a719182ad9576b51bba1a 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
> 24b11da06a69da734c85720ef39d65ee46d821d5 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
>  765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
>  81dea9d6d1921462b200c62dbdf016c0eb2f01b2 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  58f2464211a1fb7ff40f5978fd41f64d088002d0 
> 
> Diff: https://reviews.apache.org/r/32147/diff/
> 
> 
> Testing
> -------
> 
> The patch does the following:
> 1. Removes all checkpointmanagers specific to the system and replaces them to 
> a single entity that uses coordinator stream to do the checkpointing
> 2. Decoupled change log from checkpoint manager, a new component called 
> changelog manager will do that.
> 3. Passes the checkpoint information to the containers from the jobcoordinator
> 4. Modifies the offsetmanger to use the new checkpoint manager and starting 
> offsets from jobcoordinator.
> 
> 
> Tests:
> All existing Unit tests and Zopkio tests pass, I have changed some of 
> existing Unit tests as well.
> Plan to add one more Unit test to verify checkpoint persistence (Stub present 
> in TestJobCoordinator)
> 
> Pending Issues:
> 1. The metrics registry is right now not used correctly, I need to pass the 
> right reference to it
> 2. The coordinator stream does not use the same consumer and producer from 
> the "systemconsumers" and "systemproducers" we have (This will be done after 
> SAMZA-567)
> 3. Remove checkpoint configs related to the stream, migrate them to the 
> coordinator stream and document them.
> 
> 
> Thanks,
> 
> Naveen Somasundaram
> 
>

Reply via email to