-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32147/#review82006
-----------------------------------------------------------



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/32147/#comment132584>

    We need to bootstrap everytime because, if the container fails and comes 
back up, then we need to fetch the latest checkpoint from the coordinator 
stream before we serve. This logic happens in JobCoordinator, for every HTTP 
request, we refresh the JobModel, which ensures we always have the latest 
JobModel (which has the latest checkpoint, because we bootstrap everytime).



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/32147/#comment132594>

    This is consistent with how it was previously in KafkaCheckpointManager. 
The partition is per task, but the topic is not the same. Each of the task will 
use the same Partition, but with different changelog topic name for each store. 
    
    Relevant part of code from SamzaContainer:
    ```
        val changeLogSystemStreams = config
          .getStoreNames
          .filter(config.getChangelogStream(_).isDefined)
          .map(name => (name, config.getChangelogStream(name).get)).toMap
          .mapValues(Util.getSystemStreamFromNames(_))
    ```
    
    And each of the store stream will use the same partition, which is fetched 
from this mapping.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
<https://reviews.apache.org/r/32147/#comment132595>

    boolean default in java is false. Or is there something else you are 
referring to ?


- Naveen Somasundaram


On April 10, 2015, 3:13 a.m., Naveen Somasundaram wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32147/
> -----------------------------------------------------------
> 
> (Updated April 10, 2015, 3:13 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA 465
> 
> 
> Diffs
> -----
> 
>   build.gradle 97de3a28f6379e3862eec845da87587b1d4f742e 
>   samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java  
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> 092cb910b40d312217e86420bf1ddfbaf605e9e5 
>   
> samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
>  a97ff0919d8205928efee1a2a20780659180849d 
>   samza-api/src/main/java/org/apache/samza/container/TaskName.java 
> 083358686fc69ab45bbc73e898f419224ebc3a9f 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 8995ba30c823bddcdfd3af7100e1440e71ef7998 
>   samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java 
> 6ff1a555f3c48e416bb78e94c5df71eff0a27f3d 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  01997ae22641b735cd452a0e89a49219e2874892 
>   samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java 
> eb22d2ec5f09ca59790e2871d9bff9745fe925dc 
>   
> samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
>  7dc431c74a3fc2ba80eb47d6c5d87524cb4c9bde 
>   
> samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
>  3517912eaafbf95f8c8cc70ab5869548a56b76e7 
>   
> samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
> ddc30af7c52d8a4d5c5de02f6757c040b1f31c93 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> a40c87fa7865746a5612c55a4cf24c8d005be7e0 
>   
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
>  2a87a6e0cef72179b5383fc824266de1f9606293 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 3b6685e00837a4aaf809813e62b7e52823bc07a9 
>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
> 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02 
>   samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala 
> adef09e15c666cb2dbb2e4c5507fc2d605b82a1e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 1ca9e2cc5673c537b6a48224809847e94da81fca 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 5416dd620b2f65ffb09cf5f8c07b1a547df82bab 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> c14f2f623bb4bae911dd3085ce428175930e4545 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
>  10986a49b39cda703a0e54688dc914f2465c79c9 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
>  635c3531a897a369c813821f7b901186e1281ed1 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
> 0b720ec4dd83c71fd1ce5071571c7a10babf0ddc 
>   
> samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
> b80d34953a54ada461ed1d4b0dcfa07f4435f877 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
> 530255e5866bc49ec5ce1a0b7437470cd4e17010 
>   samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala 
> 744eec05857a4ea14c718e3750fb575d3678b1f8 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  ec1d749cb5186f788b402877996a4caa37e99362 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> 1a67586eeec95dabfeb3b6881af9b3865c3029ca 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
>  76bc6810a3162a1dc58a36033b3b1f75616bd6ca 
>   samza-core/src/test/resources/test.properties 
> 9348c7de956ebf02f58a163dc6fb391a7e29ae64 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
>  af800dfeedbfea75abaac3f15fd53bc55b743daf 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> d18d4c45c5de3b50a24d6c776364e1f589db8f4d 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
>  10ff1f437220b38810f8a32903cc72df20f206d3 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> cab31ca6452e45c73808f20b12d39a30c117119a 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
>  1ee5c06fbd1be5e4ce944a16454c8bd32459d395 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  a8e5d36921464a2e36693279e8083e4544c4e289 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
>  9a8e1450a6cc14713817f719cfa56a0e5c97a6f6 
>   samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala 
> 60460713a2d4f7b7b389f21c1450d45c1afaa0f4 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
>  5d8ee4fa74be9ba36956c11ae33573be2d2d5826 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  c9504ecf1d1edbc116ca6d794678062fddeee7fa 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  3dfa26a23c88f9b5b8ffbfd64b59c4061a8ed2e7 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 02a6ea94daf9eb597c9ecef5d63062df5861efc8 
>   
> samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 
> a34c3f2738855dbf3737639c33846fcad23bd3b9 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  f783c5790f442928ea83b13359ac6b2a5bcb02e5 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  c84ceb75b84eb6d2ce115396ba54cf9e455d905c 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
>  b76d5ad68640908bef552125d405b467386025f8 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  7d4bea8398794c2325f9c022074303a83cfb164a 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  0380d35121f4feb99efc7d092b4232030d12db01 
>   
> samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
> d3f25c0e03a727e64a774581384ef5aae9ef9c1c 
>   samza-test/src/main/config/join/common.properties 
> ad10aac090beb072ecce3546f06279a7a6113970 
>   samza-test/src/main/config/negate-number.properties 
> b9f898c745250252461c833adb05e24ece2d4a89 
>   samza-test/src/main/config/perf/container-performance.properties 
> 86b2d58e2d4a9d6bcc9bd305bb7d735f7d1dd053 
>   samza-test/src/main/config/perf/kafka-read-write-performance.properties 
> 122b14aaecc2d221ac8944d04d508e7f83ede5ab 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> c0a20af5a2f4329ad4a2cff378ced3bececbc1cb 
>   samza-test/src/main/python/samza_failure_testing.py 
> 198db26528cab8b473f794a922848a60299dc825 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
>  45c76e86361b9e1a54cb5fc717126d36b64cf7e8 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  d66b3bd070a4cef4b1d3dded1d79a33cbe3fa09b 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 
> 03395e2efa0fec723e354177d06bfacf7d2a9215 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
> a1dbe0435ae08c710d4bfc871458ed386e275cd2 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
>  b0b6543856cb87888c5a719182ad9576b51bba1a 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
> 24b11da06a69da734c85720ef39d65ee46d821d5 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
>  765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
>  81dea9d6d1921462b200c62dbdf016c0eb2f01b2 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  58f2464211a1fb7ff40f5978fd41f64d088002d0 
> 
> Diff: https://reviews.apache.org/r/32147/diff/
> 
> 
> Testing
> -------
> 
> The patch does the following:
> 1. Removes all checkpointmanagers specific to the system and replaces them to 
> a single entity that uses coordinator stream to do the checkpointing
> 2. Decoupled change log from checkpoint manager, a new component called 
> changelog manager will do that.
> 3. Passes the checkpoint information to the containers from the jobcoordinator
> 4. Modifies the offsetmanger to use the new checkpoint manager and starting 
> offsets from jobcoordinator.
> 
> 
> Tests:
> All existing Unit tests and Zopkio tests pass, I have changed some of 
> existing Unit tests as well.
> Plan to add one more Unit test to verify checkpoint persistence (Stub present 
> in TestJobCoordinator)
> 
> Pending Issues:
> 1. The metrics registry is right now not used correctly, I need to pass the 
> right reference to it
> 2. The coordinator stream does not use the same consumer and producer from 
> the "systemconsumers" and "systemproducers" we have (This will be done after 
> SAMZA-567)
> 3. Remove checkpoint configs related to the stream, migrate them to the 
> coordinator stream and document them.
> 
> 
> Thanks,
> 
> Naveen Somasundaram
> 
>

Reply via email to