----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32147/#review81995 -----------------------------------------------------------
samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java <https://reviews.apache.org/r/32147/#comment132563> why do we need to bootstrap everytime we call this method? samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java <https://reviews.apache.org/r/32147/#comment132566> typo, SetChangelogMapping? samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java <https://reviews.apache.org/r/32147/#comment132567> typo: they -> the samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java <https://reviews.apache.org/r/32147/#comment132564> if this is for the changelog of the store, how do we differentiate two stores in the same TaskName and Container? samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java <https://reviews.apache.org/r/32147/#comment132568> no default value? samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java <https://reviews.apache.org/r/32147/#comment132561> this doc is a little vague. Though it only pays attention to the config messges, actully it's also used by other method (getBootstrappedStream) that is called from CheckpointManager. samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java <https://reviews.apache.org/r/32147/#comment132558> Will this bootstrappedStreamSet overflow or take a lot of memory ? Because we are putting all the messages in the stream to this map. When we consider the checkpoint msg, it can be huge. samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java <https://reviews.apache.org/r/32147/#comment132562> check if (isBoostrapped)? The same question in "public Checkpoint readLastCheckpoint(TaskName taskName) " samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java <https://reviews.apache.org/r/32147/#comment132569> no default value? samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java <https://reviews.apache.org/r/32147/#comment132571> actually, we do not need the consumer after boostrapping, right? Do we stop it? samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java <https://reviews.apache.org/r/32147/#comment132570> i think the same question, do we need to bootstrap again for the changelog? samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java <https://reviews.apache.org/r/32147/#comment132572> same question i raised, how do we differeciate more than one store? samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala <https://reviews.apache.org/r/32147/#comment132573> move it to the beginning of this class? samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala <https://reviews.apache.org/r/32147/#comment132574> use CHECKPOINT_SEGMENT_BYTES ? samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/32147/#comment132575> comment for this? a little confusing. samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala <https://reviews.apache.org/r/32147/#comment132576> Will the old config be overwritten when we produce the new config? samza-core/src/main/scala/org/apache/samza/util/Util.scala <https://reviews.apache.org/r/32147/#comment132577> typo: jobd -> job; format is of -> format of - Yan Fang On April 10, 2015, 3:13 a.m., Naveen Somasundaram wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/32147/ > ----------------------------------------------------------- > > (Updated April 10, 2015, 3:13 a.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > SAMZA 465 > > > Diffs > ----- > > build.gradle 97de3a28f6379e3862eec845da87587b1d4f742e > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java > 092cb910b40d312217e86420bf1ddfbaf605e9e5 > > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java > a97ff0919d8205928efee1a2a20780659180849d > samza-api/src/main/java/org/apache/samza/container/TaskName.java > 083358686fc69ab45bbc73e898f419224ebc3a9f > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java > 8995ba30c823bddcdfd3af7100e1440e71ef7998 > samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java > 6ff1a555f3c48e416bb78e94c5df71eff0a27f3d > > samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java > 01997ae22641b735cd452a0e89a49219e2874892 > samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java > eb22d2ec5f09ca59790e2871d9bff9745fe925dc > > samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java > 7dc431c74a3fc2ba80eb47d6c5d87524cb4c9bde > > samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java > 3517912eaafbf95f8c8cc70ab5869548a56b76e7 > > samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala > ddc30af7c52d8a4d5c5de02f6757c040b1f31c93 > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala > a40c87fa7865746a5612c55a4cf24c8d005be7e0 > > samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala > 2a87a6e0cef72179b5383fc824266de1f9606293 > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala > 3b6685e00837a4aaf809813e62b7e52823bc07a9 > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala > 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02 > samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala > adef09e15c666cb2dbb2e4c5507fc2d605b82a1e > samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala > 1ca9e2cc5673c537b6a48224809847e94da81fca > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 5416dd620b2f65ffb09cf5f8c07b1a547df82bab > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > c14f2f623bb4bae911dd3085ce428175930e4545 > > samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala > 10986a49b39cda703a0e54688dc914f2465c79c9 > > samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala > 635c3531a897a369c813821f7b901186e1281ed1 > > samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala > 0b720ec4dd83c71fd1ce5071571c7a10babf0ddc > > samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala > b80d34953a54ada461ed1d4b0dcfa07f4435f877 > samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala > 530255e5866bc49ec5ce1a0b7437470cd4e17010 > samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala > 744eec05857a4ea14c718e3750fb575d3678b1f8 > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala > ec1d749cb5186f788b402877996a4caa37e99362 > samza-core/src/main/scala/org/apache/samza/util/Util.scala > 1a67586eeec95dabfeb3b6881af9b3865c3029ca > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java > 76bc6810a3162a1dc58a36033b3b1f75616bd6ca > samza-core/src/test/resources/test.properties > 9348c7de956ebf02f58a163dc6fb391a7e29ae64 > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala > af800dfeedbfea75abaac3f15fd53bc55b743daf > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > d18d4c45c5de3b50a24d6c776364e1f589db8f4d > > samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala > 10ff1f437220b38810f8a32903cc72df20f206d3 > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > cab31ca6452e45c73808f20b12d39a30c117119a > > samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala > 1ee5c06fbd1be5e4ce944a16454c8bd32459d395 > > samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala > a8e5d36921464a2e36693279e8083e4544c4e289 > > samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala > 9a8e1450a6cc14713817f719cfa56a0e5c97a6f6 > samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala > 60460713a2d4f7b7b389f21c1450d45c1afaa0f4 > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala > 5d8ee4fa74be9ba36956c11ae33573be2d2d5826 > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala > c9504ecf1d1edbc116ca6d794678062fddeee7fa > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala > 3dfa26a23c88f9b5b8ffbfd64b59c4061a8ed2e7 > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > 02a6ea94daf9eb597c9ecef5d63062df5861efc8 > > samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala > a34c3f2738855dbf3737639c33846fcad23bd3b9 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > f783c5790f442928ea83b13359ac6b2a5bcb02e5 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > c84ceb75b84eb6d2ce115396ba54cf9e455d905c > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala > b76d5ad68640908bef552125d405b467386025f8 > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala > 7d4bea8398794c2325f9c022074303a83cfb164a > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 0380d35121f4feb99efc7d092b4232030d12db01 > > samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java > d3f25c0e03a727e64a774581384ef5aae9ef9c1c > samza-test/src/main/config/join/common.properties > ad10aac090beb072ecce3546f06279a7a6113970 > samza-test/src/main/config/negate-number.properties > b9f898c745250252461c833adb05e24ece2d4a89 > samza-test/src/main/config/perf/container-performance.properties > 86b2d58e2d4a9d6bcc9bd305bb7d735f7d1dd053 > samza-test/src/main/config/perf/kafka-read-write-performance.properties > 122b14aaecc2d221ac8944d04d508e7f83ede5ab > samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java > c0a20af5a2f4329ad4a2cff378ced3bececbc1cb > samza-test/src/main/python/samza_failure_testing.py > 198db26528cab8b473f794a922848a60299dc825 > > samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala > 45c76e86361b9e1a54cb5fc717126d36b64cf7e8 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > d66b3bd070a4cef4b1d3dded1d79a33cbe3fa09b > samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala > 03395e2efa0fec723e354177d06bfacf7d2a9215 > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala > a1dbe0435ae08c710d4bfc871458ed386e275cd2 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala > 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala > b0b6543856cb87888c5a719182ad9576b51bba1a > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala > 24b11da06a69da734c85720ef39d65ee46d821d5 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala > 765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala > 81dea9d6d1921462b200c62dbdf016c0eb2f01b2 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > 58f2464211a1fb7ff40f5978fd41f64d088002d0 > > Diff: https://reviews.apache.org/r/32147/diff/ > > > Testing > ------- > > The patch does the following: > 1. Removes all checkpointmanagers specific to the system and replaces them to > a single entity that uses coordinator stream to do the checkpointing > 2. Decoupled change log from checkpoint manager, a new component called > changelog manager will do that. > 3. Passes the checkpoint information to the containers from the jobcoordinator > 4. Modifies the offsetmanger to use the new checkpoint manager and starting > offsets from jobcoordinator. > > > Tests: > All existing Unit tests and Zopkio tests pass, I have changed some of > existing Unit tests as well. > Plan to add one more Unit test to verify checkpoint persistence (Stub present > in TestJobCoordinator) > > Pending Issues: > 1. The metrics registry is right now not used correctly, I need to pass the > right reference to it > 2. The coordinator stream does not use the same consumer and producer from > the "systemconsumers" and "systemproducers" we have (This will be done after > SAMZA-567) > 3. Remove checkpoint configs related to the stream, migrate them to the > coordinator stream and document them. > > > Thanks, > > Naveen Somasundaram > >