-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32147/
-----------------------------------------------------------

(Updated April 1, 2015, 9:02 p.m.)


Review request for samza.


Changes
-------

Rebased with master, addressed previous review comments, added retry loop for 
reconnecting with job coordinator, added metrics for coordinator.


Repository: samza


Description
-------

SAMZA 465


Diffs (updated)
-----

  build.gradle 97de3a28f6379e3862eec845da87587b1d4f742e 
  samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java  
  samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
092cb910b40d312217e86420bf1ddfbaf605e9e5 
  
samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
 a97ff0919d8205928efee1a2a20780659180849d 
  samza-api/src/main/java/org/apache/samza/container/TaskName.java 
083358686fc69ab45bbc73e898f419224ebc3a9f 
  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
8995ba30c823bddcdfd3af7100e1440e71ef7998 
  samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java 
6ff1a555f3c48e416bb78e94c5df71eff0a27f3d 
  
samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
 01997ae22641b735cd452a0e89a49219e2874892 
  samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
 PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java 
eb22d2ec5f09ca59790e2871d9bff9745fe925dc 
  
samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
 7dc431c74a3fc2ba80eb47d6c5d87524cb4c9bde 
  
samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
 3517912eaafbf95f8c8cc70ab5869548a56b76e7 
  
samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
 PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
ddc30af7c52d8a4d5c5de02f6757c040b1f31c93 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
a40c87fa7865746a5612c55a4cf24c8d005be7e0 
  
samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
 2a87a6e0cef72179b5383fc824266de1f9606293 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
3b6685e00837a4aaf809813e62b7e52823bc07a9 
  samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
1a2dd4413f56e53dbeeb47b5637d7b0c50522f02 
  samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala 
adef09e15c666cb2dbb2e4c5507fc2d605b82a1e 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
1ca9e2cc5673c537b6a48224809847e94da81fca 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
87fcf589b10fd952df630d498c645159ed381739 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
c14f2f623bb4bae911dd3085ce428175930e4545 
  
samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala 
10986a49b39cda703a0e54688dc914f2465c79c9 
  
samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala 
635c3531a897a369c813821f7b901186e1281ed1 
  
samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
 PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
0b720ec4dd83c71fd1ce5071571c7a10babf0ddc 
  samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
b80d34953a54ada461ed1d4b0dcfa07f4435f877 
  samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
530255e5866bc49ec5ce1a0b7437470cd4e17010 
  samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala 
744eec05857a4ea14c718e3750fb575d3678b1f8 
  
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
 ec1d749cb5186f788b402877996a4caa37e99362 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 
1a67586eeec95dabfeb3b6881af9b3865c3029ca 
  
samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
 76bc6810a3162a1dc58a36033b3b1f75616bd6ca 
  samza-core/src/test/resources/test.properties 
9348c7de956ebf02f58a163dc6fb391a7e29ae64 
  
samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala 
af800dfeedbfea75abaac3f15fd53bc55b743daf 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
d18d4c45c5de3b50a24d6c776364e1f589db8f4d 
  
samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
 10ff1f437220b38810f8a32903cc72df20f206d3 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
81742bc93bcbca5a2ed43c701bcd5d3f40d79bfe 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
 1ee5c06fbd1be5e4ce944a16454c8bd32459d395 
  
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 
a8e5d36921464a2e36693279e8083e4544c4e289 
  
samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
 9a8e1450a6cc14713817f719cfa56a0e5c97a6f6 
  samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala 
60460713a2d4f7b7b389f21c1450d45c1afaa0f4 
  
samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
 5d8ee4fa74be9ba36956c11ae33573be2d2d5826 
  
samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 c9504ecf1d1edbc116ca6d794678062fddeee7fa 
  
samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
 7fc6d89b5d703a7c10a212aaa8d3f9231996b897 
  samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
02a6ea94daf9eb597c9ecef5d63062df5861efc8 
  samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 
a34c3f2738855dbf3737639c33846fcad23bd3b9 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
f783c5790f442928ea83b13359ac6b2a5bcb02e5 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 4f15002325bc0154991f9a35312e903d15ef81e7 
  
samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
 b76d5ad68640908bef552125d405b467386025f8 
  
samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
 7d4bea8398794c2325f9c022074303a83cfb164a 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 0380d35121f4feb99efc7d092b4232030d12db01 
  samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
d3f25c0e03a727e64a774581384ef5aae9ef9c1c 
  samza-test/src/main/config/join/common.properties 
ad10aac090beb072ecce3546f06279a7a6113970 
  samza-test/src/main/config/perf/container-performance.properties 
86b2d58e2d4a9d6bcc9bd305bb7d735f7d1dd053 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
c0a20af5a2f4329ad4a2cff378ced3bececbc1cb 
  samza-test/src/main/python/samza_failure_testing.py 
198db26528cab8b473f794a922848a60299dc825 
  
samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
 45c76e86361b9e1a54cb5fc717126d36b64cf7e8 
  
samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 d66b3bd070a4cef4b1d3dded1d79a33cbe3fa09b 
  samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 
03395e2efa0fec723e354177d06bfacf7d2a9215 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
a1dbe0435ae08c710d4bfc871458ed386e275cd2 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d 
  
samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
 b0b6543856cb87888c5a719182ad9576b51bba1a 
  samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
24b11da06a69da734c85720ef39d65ee46d821d5 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
 765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
 81dea9d6d1921462b200c62dbdf016c0eb2f01b2 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 58f2464211a1fb7ff40f5978fd41f64d088002d0 

Diff: https://reviews.apache.org/r/32147/diff/


Testing
-------

The patch does the following:
1. Removes all checkpointmanagers specific to the system and replaces them to a 
single entity that uses coordinator stream to do the checkpointing
2. Decoupled change log from checkpoint manager, a new component called 
changelog manager will do that.
3. Passes the checkpoint information to the containers from the jobcoordinator
4. Modifies the offsetmanger to use the new checkpoint manager and starting 
offsets from jobcoordinator.


Tests:
All existing Unit tests and Zopkio tests pass, I have changed some of existing 
Unit tests as well.
Plan to add one more Unit test to verify checkpoint persistence (Stub present 
in TestJobCoordinator)

Pending Issues:
1. The metrics registry is right now not used correctly, I need to pass the 
right reference to it
2. The coordinator stream does not use the same consumer and producer from the 
"systemconsumers" and "systemproducers" we have (This will be done after 
SAMZA-567)
3. Remove checkpoint configs related to the stream, migrate them to the 
coordinator stream and document them.


Thanks,

Naveen Somasundaram

Reply via email to