----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32147/#review77607 -----------------------------------------------------------
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java <https://reviews.apache.org/r/32147/#comment125741> It's not in the api, it's in core. Lmk, if you see something that indicates otherwise. samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java <https://reviews.apache.org/r/32147/#comment125742> This is must have been from auto merge on the final rebase, I went over the diff once, but had to rebase again. I will review the code over again. samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java <https://reviews.apache.org/r/32147/#comment125836> done! samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java <https://reviews.apache.org/r/32147/#comment125839> I don't think so, I'll delete and see if anything needs it. samza-api/src/main/java/org/apache/samza/container/TaskName.java <https://reviews.apache.org/r/32147/#comment125837> sure samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java <https://reviews.apache.org/r/32147/#comment125845> done samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java <https://reviews.apache.org/r/32147/#comment125846> Agreed, it makes look like it's managing all of changelog. I'll refactor to ChangelogPartitionManager, it makes more sense when it really it's just doing that :) samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java <https://reviews.apache.org/r/32147/#comment126012> I am going to remove it, it doesn't make sense to have task name here, since we always reading all mappings. samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java <https://reviews.apache.org/r/32147/#comment126017> fixed samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java <https://reviews.apache.org/r/32147/#comment126018> fixed samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java <https://reviews.apache.org/r/32147/#comment126021> I was trying not to break the semantics of "register only calling register." It keeps reading the meta inforamtion on each call, which is not the best performance. Maybe I'll just set a boolean to not re-register the consumer samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java <https://reviews.apache.org/r/32147/#comment126041> Added a new method in coordinatorstreamconsumer that will accept types and return filtered messages. samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java <https://reviews.apache.org/r/32147/#comment126042> Changed samza-core/src/main/java/org/apache/samza/changelog/ChangelogManager.java <https://reviews.apache.org/r/32147/#comment126043> fixed samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java <https://reviews.apache.org/r/32147/#comment126050> After moving the bootstrap to the CoordinatorStreamConsumer, the only common entities are start, stop and register. It more looks like a slightly bulky version of your latter suggestion. At this point, I am in favor of revisiting this when I do the host affinity mapping (for state reuse) - that I'll be working on after this. I'll see what the pattern is and move it to a base class. samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java <https://reviews.apache.org/r/32147/#comment126052> done samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java <https://reviews.apache.org/r/32147/#comment126053> moved to the consumer samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java <https://reviews.apache.org/r/32147/#comment126057> This contains is to filter out checkpoints the checkpointmanager are interested in (Through register calls). If there are checkpoints which we have not registered for, it would enter the else condition, in which case, we don't do anything. Let me know if you see any other problem with it. samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java <https://reviews.apache.org/r/32147/#comment126058> Correct samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java <https://reviews.apache.org/r/32147/#comment126166> You are right, I had written very similar code in Util. I had to dupe it because of scala, java issue. I am changing it to use the Util, now that it has changed. samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java <https://reviews.apache.org/r/32147/#comment126167> fixed samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala <https://reviews.apache.org/r/32147/#comment126169> That's a better idea, exposing that new method just for testing was really nagging me :) samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala <https://reviews.apache.org/r/32147/#comment126170> changed samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/32147/#comment126172> Correct, changed. samza-test/src/main/config/join/common.properties <https://reviews.apache.org/r/32147/#comment126522> correct, made it one system samza-test/src/main/config/negate-number.properties <https://reviews.apache.org/r/32147/#comment126174> Removed, shouldn't be necessary samza-test/src/main/config/perf/container-performance.properties <https://reviews.apache.org/r/32147/#comment126533> It should, but the code does that automatically, so this is redundant, I'll remove it from the config. samza-test/src/main/python/samza_failure_testing.py <https://reviews.apache.org/r/32147/#comment126559> changed - Naveen Somasundaram On March 17, 2015, 11:25 p.m., Naveen Somasundaram wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/32147/ > ----------------------------------------------------------- > > (Updated March 17, 2015, 11:25 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > SAMZA 465 > > > Diffs > ----- > > build.gradle 0a268ac7a3819cf46b54a93e0e3171455371456a > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java > 092cb910b40d312217e86420bf1ddfbaf605e9e5 > > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java > a97ff0919d8205928efee1a2a20780659180849d > samza-api/src/main/java/org/apache/samza/container/TaskName.java > 083358686fc69ab45bbc73e898f419224ebc3a9f > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java > 8995ba30c823bddcdfd3af7100e1440e71ef7998 > samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java > 6ff1a555f3c48e416bb78e94c5df71eff0a27f3d > > samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java > 01997ae22641b735cd452a0e89a49219e2874892 > samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java > eb22d2ec5f09ca59790e2871d9bff9745fe925dc > > samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java > 7dc431c74a3fc2ba80eb47d6c5d87524cb4c9bde > > samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java > 3517912eaafbf95f8c8cc70ab5869548a56b76e7 > > samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala > ddc30af7c52d8a4d5c5de02f6757c040b1f31c93 > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala > a40c87fa7865746a5612c55a4cf24c8d005be7e0 > > samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala > 2a87a6e0cef72179b5383fc824266de1f9606293 > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala > 3b6685e00837a4aaf809813e62b7e52823bc07a9 > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala > 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02 > samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala > adef09e15c666cb2dbb2e4c5507fc2d605b82a1e > samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala > 1ca9e2cc5673c537b6a48224809847e94da81fca > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 275eb1a924d09a0a43efe6273e0d2af9217e1c74 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > c14f2f623bb4bae911dd3085ce428175930e4545 > > samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala > 10986a49b39cda703a0e54688dc914f2465c79c9 > > samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala > 635c3531a897a369c813821f7b901186e1281ed1 > > samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala > 0b720ec4dd83c71fd1ce5071571c7a10babf0ddc > > samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala > b80d34953a54ada461ed1d4b0dcfa07f4435f877 > samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala > 530255e5866bc49ec5ce1a0b7437470cd4e17010 > samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala > 744eec05857a4ea14c718e3750fb575d3678b1f8 > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala > ec1d749cb5186f788b402877996a4caa37e99362 > samza-core/src/main/scala/org/apache/samza/util/Util.scala > 1a67586eeec95dabfeb3b6881af9b3865c3029ca > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java > 76bc6810a3162a1dc58a36033b3b1f75616bd6ca > samza-core/src/test/resources/test.properties > 9348c7de956ebf02f58a163dc6fb391a7e29ae64 > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala > af800dfeedbfea75abaac3f15fd53bc55b743daf > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > d18d4c45c5de3b50a24d6c776364e1f589db8f4d > > samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala > 10ff1f437220b38810f8a32903cc72df20f206d3 > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > 81742bc93bcbca5a2ed43c701bcd5d3f40d79bfe > > samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala > 1ee5c06fbd1be5e4ce944a16454c8bd32459d395 > > samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala > a8e5d36921464a2e36693279e8083e4544c4e289 > > samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala > 9a8e1450a6cc14713817f719cfa56a0e5c97a6f6 > samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala > 60460713a2d4f7b7b389f21c1450d45c1afaa0f4 > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala > 5d8ee4fa74be9ba36956c11ae33573be2d2d5826 > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala > 4a1b31f025ba7b05a7b46041aa8e12074599ce24 > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala > 7fc6d89b5d703a7c10a212aaa8d3f9231996b897 > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > 02a6ea94daf9eb597c9ecef5d63062df5861efc8 > > samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala > a34c3f2738855dbf3737639c33846fcad23bd3b9 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > b790be17cfe08da28220ffb381cbd618ebe25cf0 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > 4f15002325bc0154991f9a35312e903d15ef81e7 > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala > b76d5ad68640908bef552125d405b467386025f8 > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala > 7d4bea8398794c2325f9c022074303a83cfb164a > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 3d1e6ecbb3fd95816c722a68c4f5907120eb20d0 > > samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java > 4ef3551f470e77e27bd156e81ce96486f25c21bf > samza-test/src/main/config/join/common.properties > ad10aac090beb072ecce3546f06279a7a6113970 > samza-test/src/main/config/perf/container-performance.properties > 86b2d58e2d4a9d6bcc9bd305bb7d735f7d1dd053 > samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java > c0a20af5a2f4329ad4a2cff378ced3bececbc1cb > samza-test/src/main/python/samza_failure_testing.py > 198db26528cab8b473f794a922848a60299dc825 > > samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala > 45c76e86361b9e1a54cb5fc717126d36b64cf7e8 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > a8b724bf781003142e455fdf1fed2f13d6c18353 > samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala > 03395e2efa0fec723e354177d06bfacf7d2a9215 > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala > a1dbe0435ae08c710d4bfc871458ed386e275cd2 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala > 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala > b0b6543856cb87888c5a719182ad9576b51bba1a > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala > 24b11da06a69da734c85720ef39d65ee46d821d5 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala > 765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala > 81dea9d6d1921462b200c62dbdf016c0eb2f01b2 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > 58f2464211a1fb7ff40f5978fd41f64d088002d0 > > Diff: https://reviews.apache.org/r/32147/diff/ > > > Testing > ------- > > The patch does the following: > 1. Removes all checkpointmanagers specific to the system and replaces them to > a single entity that uses coordinator stream to do the checkpointing > 2. Decoupled change log from checkpoint manager, a new component called > changelog manager will do that. > 3. Passes the checkpoint information to the containers from the jobcoordinator > 4. Modifies the offsetmanger to use the new checkpoint manager and starting > offsets from jobcoordinator. > > > Tests: > All existing Unit tests and Zopkio tests pass, I have changed some of > existing Unit tests as well. > Plan to add one more Unit test to verify checkpoint persistence (Stub present > in TestJobCoordinator) > > Pending Issues: > 1. The metrics registry is right now not used correctly, I need to pass the > right reference to it > 2. The coordinator stream does not use the same consumer and producer from > the "systemconsumers" and "systemproducers" we have (This will be done after > SAMZA-567) > 3. Remove checkpoint configs related to the stream, migrate them to the > coordinator stream and document them. > > > Thanks, > > Naveen Somasundaram > >