-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32147/#review78680
-----------------------------------------------------------



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment127632>

    Can we make this private final, and call new in the constructor?



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment127631>

    All this stuff should be private final, right?



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment127630>

    Prefer two constructors. One with source, and the other without.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment127633>

    TaskName or container id?



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment127634>

    TaskName or container id?



samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
<https://reviews.apache.org/r/32147/#comment127635>

    Same comments as CheckpointManager. private finals and second constructor.



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/32147/#comment127636>

    nit: lowercase c



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
<https://reviews.apache.org/r/32147/#comment127732>

    Nit: lowercase 'j'



samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/32147/#comment127734>

    Is it just 200, or all 2XX codes that we're OK with?



samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/32147/#comment127733>

    I don't think that you need to parameterize this (the {}). SLF4J's Logger 
takes an error(String, Throwable) .. 
http://www.slf4j.org/apidocs/org/slf4j/Logger.html


- Chris Riccomini


On April 1, 2015, 9:02 p.m., Naveen Somasundaram wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32147/
> -----------------------------------------------------------
> 
> (Updated April 1, 2015, 9:02 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA 465
> 
> 
> Diffs
> -----
> 
>   build.gradle 97de3a28f6379e3862eec845da87587b1d4f742e 
>   samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java  
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> 092cb910b40d312217e86420bf1ddfbaf605e9e5 
>   
> samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
>  a97ff0919d8205928efee1a2a20780659180849d 
>   samza-api/src/main/java/org/apache/samza/container/TaskName.java 
> 083358686fc69ab45bbc73e898f419224ebc3a9f 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 8995ba30c823bddcdfd3af7100e1440e71ef7998 
>   samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java 
> 6ff1a555f3c48e416bb78e94c5df71eff0a27f3d 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  01997ae22641b735cd452a0e89a49219e2874892 
>   samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java 
> eb22d2ec5f09ca59790e2871d9bff9745fe925dc 
>   
> samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
>  7dc431c74a3fc2ba80eb47d6c5d87524cb4c9bde 
>   
> samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
>  3517912eaafbf95f8c8cc70ab5869548a56b76e7 
>   
> samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
> ddc30af7c52d8a4d5c5de02f6757c040b1f31c93 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> a40c87fa7865746a5612c55a4cf24c8d005be7e0 
>   
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
>  2a87a6e0cef72179b5383fc824266de1f9606293 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 3b6685e00837a4aaf809813e62b7e52823bc07a9 
>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
> 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02 
>   samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala 
> adef09e15c666cb2dbb2e4c5507fc2d605b82a1e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 1ca9e2cc5673c537b6a48224809847e94da81fca 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 87fcf589b10fd952df630d498c645159ed381739 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> c14f2f623bb4bae911dd3085ce428175930e4545 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
>  10986a49b39cda703a0e54688dc914f2465c79c9 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
>  635c3531a897a369c813821f7b901186e1281ed1 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
> 0b720ec4dd83c71fd1ce5071571c7a10babf0ddc 
>   
> samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
> b80d34953a54ada461ed1d4b0dcfa07f4435f877 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
> 530255e5866bc49ec5ce1a0b7437470cd4e17010 
>   samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala 
> 744eec05857a4ea14c718e3750fb575d3678b1f8 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  ec1d749cb5186f788b402877996a4caa37e99362 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> 1a67586eeec95dabfeb3b6881af9b3865c3029ca 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
>  76bc6810a3162a1dc58a36033b3b1f75616bd6ca 
>   samza-core/src/test/resources/test.properties 
> 9348c7de956ebf02f58a163dc6fb391a7e29ae64 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
>  af800dfeedbfea75abaac3f15fd53bc55b743daf 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> d18d4c45c5de3b50a24d6c776364e1f589db8f4d 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
>  10ff1f437220b38810f8a32903cc72df20f206d3 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 81742bc93bcbca5a2ed43c701bcd5d3f40d79bfe 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
>  1ee5c06fbd1be5e4ce944a16454c8bd32459d395 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  a8e5d36921464a2e36693279e8083e4544c4e289 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
>  9a8e1450a6cc14713817f719cfa56a0e5c97a6f6 
>   samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala 
> 60460713a2d4f7b7b389f21c1450d45c1afaa0f4 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
>  5d8ee4fa74be9ba36956c11ae33573be2d2d5826 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  c9504ecf1d1edbc116ca6d794678062fddeee7fa 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  7fc6d89b5d703a7c10a212aaa8d3f9231996b897 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 02a6ea94daf9eb597c9ecef5d63062df5861efc8 
>   
> samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 
> a34c3f2738855dbf3737639c33846fcad23bd3b9 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  f783c5790f442928ea83b13359ac6b2a5bcb02e5 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  4f15002325bc0154991f9a35312e903d15ef81e7 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
>  b76d5ad68640908bef552125d405b467386025f8 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  7d4bea8398794c2325f9c022074303a83cfb164a 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  0380d35121f4feb99efc7d092b4232030d12db01 
>   
> samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
> d3f25c0e03a727e64a774581384ef5aae9ef9c1c 
>   samza-test/src/main/config/join/common.properties 
> ad10aac090beb072ecce3546f06279a7a6113970 
>   samza-test/src/main/config/perf/container-performance.properties 
> 86b2d58e2d4a9d6bcc9bd305bb7d735f7d1dd053 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> c0a20af5a2f4329ad4a2cff378ced3bececbc1cb 
>   samza-test/src/main/python/samza_failure_testing.py 
> 198db26528cab8b473f794a922848a60299dc825 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
>  45c76e86361b9e1a54cb5fc717126d36b64cf7e8 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  d66b3bd070a4cef4b1d3dded1d79a33cbe3fa09b 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 
> 03395e2efa0fec723e354177d06bfacf7d2a9215 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
> a1dbe0435ae08c710d4bfc871458ed386e275cd2 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
>  b0b6543856cb87888c5a719182ad9576b51bba1a 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
> 24b11da06a69da734c85720ef39d65ee46d821d5 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
>  765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
>  81dea9d6d1921462b200c62dbdf016c0eb2f01b2 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  58f2464211a1fb7ff40f5978fd41f64d088002d0 
> 
> Diff: https://reviews.apache.org/r/32147/diff/
> 
> 
> Testing
> -------
> 
> The patch does the following:
> 1. Removes all checkpointmanagers specific to the system and replaces them to 
> a single entity that uses coordinator stream to do the checkpointing
> 2. Decoupled change log from checkpoint manager, a new component called 
> changelog manager will do that.
> 3. Passes the checkpoint information to the containers from the jobcoordinator
> 4. Modifies the offsetmanger to use the new checkpoint manager and starting 
> offsets from jobcoordinator.
> 
> 
> Tests:
> All existing Unit tests and Zopkio tests pass, I have changed some of 
> existing Unit tests as well.
> Plan to add one more Unit test to verify checkpoint persistence (Stub present 
> in TestJobCoordinator)
> 
> Pending Issues:
> 1. The metrics registry is right now not used correctly, I need to pass the 
> right reference to it
> 2. The coordinator stream does not use the same consumer and producer from 
> the "systemconsumers" and "systemproducers" we have (This will be done after 
> SAMZA-567)
> 3. Remove checkpoint configs related to the stream, migrate them to the 
> coordinator stream and document them.
> 
> 
> Thanks,
> 
> Naveen Somasundaram
> 
>

Reply via email to