> On April 29, 2015, 7:30 p.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
> >  line 436
> > <https://reviews.apache.org/r/32147/diff/7/?file=922533#file922533line436>
> >
> >     typo, SetChangelogMapping?

I'll fix this when I update the Doc.


> On April 29, 2015, 7:30 p.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java,
> >  line 444
> > <https://reviews.apache.org/r/32147/diff/7/?file=922533#file922533line444>
> >
> >     typo: they -> the

I will fix this when I update the Doc.


> On April 29, 2015, 7:30 p.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java,
> >  lines 132-133
> > <https://reviews.apache.org/r/32147/diff/7/?file=922534#file922534line132>
> >
> >     this doc is a little vague. Though it only pays attention to the config 
> > messges, actully it's also used by other method (getBootstrappedStream) 
> > that is called from CheckpointManager.

I agree, requires clean up, I'll address this along with the docs.


> On April 29, 2015, 7:30 p.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java,
> >  line 149
> > <https://reviews.apache.org/r/32147/diff/7/?file=922534#file922534line149>
> >
> >     Will this bootstrappedStreamSet overflow or take a lot of memory ? 
> > Because we are putting all the messages in the stream to this map. When we 
> > consider the checkpoint msg, it can be huge.

It is true that we'll have multiple checkpoint messages, the reason why it 
won't be big:
1. The Coordinator stream will be log compacted.
2. The HashSet reads from the oldest to the latest, and if there are 
duplicates, it will overwritten in the HashSet. i.e., the Set should only have 
the latest checkpoint (one per Task), all the previous checkpoints will be 
overwritten as you are reading newer ones.


> On April 29, 2015, 7:30 p.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java,
> >  line 176
> > <https://reviews.apache.org/r/32147/diff/7/?file=922534#file922534line176>
> >
> >     check if (isBoostrapped)? The same question in "public Checkpoint 
> > readLastCheckpoint(TaskName taskName) "

Same answer as the previous one, we need to bootstrap each time to guarantee 
correctness for failed containers.


> On April 29, 2015, 7:30 p.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java,
> >  lines 60-68
> > <https://reviews.apache.org/r/32147/diff/7/?file=922539#file922539line60>
> >
> >     actually, we do not need the consumer after boostrapping, right? Do we 
> > stop it?

We use the same consumer for both the checkpoint and changelog. The checkpoint 
consumer needs to live for the entire lifespan of the Job (Hence the 
coordinator consumer) will live for the entire lifespan of the job (Same logic 
as commented why we keep bootstrapping each time).


> On April 29, 2015, 7:30 p.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java,
> >  line 89
> > <https://reviews.apache.org/r/32147/diff/7/?file=922539#file922539line89>
> >
> >     i think the same question, do we need to bootstrap again for the 
> > changelog?

previous answer


> On April 29, 2015, 7:30 p.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java,
> >  line 104
> > <https://reviews.apache.org/r/32147/diff/7/?file=922539#file922539line104>
> >
> >     same question i raised, how do we differeciate more than one store?

previous answer


> On April 29, 2015, 7:30 p.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala, line 108
> > <https://reviews.apache.org/r/32147/diff/7/?file=922543#file922543line108>
> >
> >     use CHECKPOINT_SEGMENT_BYTES ?

That config is removed (along with the class) because checkpoint topic is no 
longer a thing, it's replaced by coordinator stream.


> On April 29, 2015, 7:30 p.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, 
> > line 340
> > <https://reviews.apache.org/r/32147/diff/7/?file=922547#file922547line340>
> >
> >     comment for this? a little confusing.

Agreed, I'll add more clarifying comments here explained why need all offsets.


> On April 29, 2015, 7:30 p.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala, lines 83-92
> > <https://reviews.apache.org/r/32147/diff/7/?file=922552#file922552line83>
> >
> >     Will the old config be overwritten when we produce the new config?

Correct, this is code that deletes the delta (difference between the old and 
new config):
```
    (oldConfig.keySet -- config.keySet).foreach(key => {
      coordinatorSystemProducer.send(new 
CoordinatorStreamMessage.Delete(JobRunner.SOURCE, key, SetConfig.TYPE))
    })
```


> On April 29, 2015, 7:30 p.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 172
> > <https://reviews.apache.org/r/32147/diff/7/?file=922557#file922557line172>
> >
> >     typo: jobd -> job; format is of -> format of

will fix.


- Naveen


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32147/#review81995
-----------------------------------------------------------


On April 10, 2015, 3:13 a.m., Naveen Somasundaram wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32147/
> -----------------------------------------------------------
> 
> (Updated April 10, 2015, 3:13 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA 465
> 
> 
> Diffs
> -----
> 
>   build.gradle 97de3a28f6379e3862eec845da87587b1d4f742e 
>   samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java  
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> 092cb910b40d312217e86420bf1ddfbaf605e9e5 
>   
> samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
>  a97ff0919d8205928efee1a2a20780659180849d 
>   samza-api/src/main/java/org/apache/samza/container/TaskName.java 
> 083358686fc69ab45bbc73e898f419224ebc3a9f 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 8995ba30c823bddcdfd3af7100e1440e71ef7998 
>   samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java 
> 6ff1a555f3c48e416bb78e94c5df71eff0a27f3d 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  01997ae22641b735cd452a0e89a49219e2874892 
>   samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java 
> eb22d2ec5f09ca59790e2871d9bff9745fe925dc 
>   
> samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
>  7dc431c74a3fc2ba80eb47d6c5d87524cb4c9bde 
>   
> samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
>  3517912eaafbf95f8c8cc70ab5869548a56b76e7 
>   
> samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
> ddc30af7c52d8a4d5c5de02f6757c040b1f31c93 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> a40c87fa7865746a5612c55a4cf24c8d005be7e0 
>   
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
>  2a87a6e0cef72179b5383fc824266de1f9606293 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 3b6685e00837a4aaf809813e62b7e52823bc07a9 
>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
> 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02 
>   samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala 
> adef09e15c666cb2dbb2e4c5507fc2d605b82a1e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 1ca9e2cc5673c537b6a48224809847e94da81fca 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 5416dd620b2f65ffb09cf5f8c07b1a547df82bab 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> c14f2f623bb4bae911dd3085ce428175930e4545 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
>  10986a49b39cda703a0e54688dc914f2465c79c9 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
>  635c3531a897a369c813821f7b901186e1281ed1 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
> 0b720ec4dd83c71fd1ce5071571c7a10babf0ddc 
>   
> samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
> b80d34953a54ada461ed1d4b0dcfa07f4435f877 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
> 530255e5866bc49ec5ce1a0b7437470cd4e17010 
>   samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala 
> 744eec05857a4ea14c718e3750fb575d3678b1f8 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  ec1d749cb5186f788b402877996a4caa37e99362 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> 1a67586eeec95dabfeb3b6881af9b3865c3029ca 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
>  76bc6810a3162a1dc58a36033b3b1f75616bd6ca 
>   samza-core/src/test/resources/test.properties 
> 9348c7de956ebf02f58a163dc6fb391a7e29ae64 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
>  af800dfeedbfea75abaac3f15fd53bc55b743daf 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> d18d4c45c5de3b50a24d6c776364e1f589db8f4d 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
>  10ff1f437220b38810f8a32903cc72df20f206d3 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> cab31ca6452e45c73808f20b12d39a30c117119a 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
>  1ee5c06fbd1be5e4ce944a16454c8bd32459d395 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  a8e5d36921464a2e36693279e8083e4544c4e289 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
>  9a8e1450a6cc14713817f719cfa56a0e5c97a6f6 
>   samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala 
> 60460713a2d4f7b7b389f21c1450d45c1afaa0f4 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
>  5d8ee4fa74be9ba36956c11ae33573be2d2d5826 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  c9504ecf1d1edbc116ca6d794678062fddeee7fa 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  3dfa26a23c88f9b5b8ffbfd64b59c4061a8ed2e7 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 02a6ea94daf9eb597c9ecef5d63062df5861efc8 
>   
> samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 
> a34c3f2738855dbf3737639c33846fcad23bd3b9 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  f783c5790f442928ea83b13359ac6b2a5bcb02e5 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  c84ceb75b84eb6d2ce115396ba54cf9e455d905c 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
>  b76d5ad68640908bef552125d405b467386025f8 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  7d4bea8398794c2325f9c022074303a83cfb164a 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  0380d35121f4feb99efc7d092b4232030d12db01 
>   
> samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
> d3f25c0e03a727e64a774581384ef5aae9ef9c1c 
>   samza-test/src/main/config/join/common.properties 
> ad10aac090beb072ecce3546f06279a7a6113970 
>   samza-test/src/main/config/negate-number.properties 
> b9f898c745250252461c833adb05e24ece2d4a89 
>   samza-test/src/main/config/perf/container-performance.properties 
> 86b2d58e2d4a9d6bcc9bd305bb7d735f7d1dd053 
>   samza-test/src/main/config/perf/kafka-read-write-performance.properties 
> 122b14aaecc2d221ac8944d04d508e7f83ede5ab 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> c0a20af5a2f4329ad4a2cff378ced3bececbc1cb 
>   samza-test/src/main/python/samza_failure_testing.py 
> 198db26528cab8b473f794a922848a60299dc825 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
>  45c76e86361b9e1a54cb5fc717126d36b64cf7e8 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  d66b3bd070a4cef4b1d3dded1d79a33cbe3fa09b 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 
> 03395e2efa0fec723e354177d06bfacf7d2a9215 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
> a1dbe0435ae08c710d4bfc871458ed386e275cd2 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
>  b0b6543856cb87888c5a719182ad9576b51bba1a 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
> 24b11da06a69da734c85720ef39d65ee46d821d5 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
>  765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
>  81dea9d6d1921462b200c62dbdf016c0eb2f01b2 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  58f2464211a1fb7ff40f5978fd41f64d088002d0 
> 
> Diff: https://reviews.apache.org/r/32147/diff/
> 
> 
> Testing
> -------
> 
> The patch does the following:
> 1. Removes all checkpointmanagers specific to the system and replaces them to 
> a single entity that uses coordinator stream to do the checkpointing
> 2. Decoupled change log from checkpoint manager, a new component called 
> changelog manager will do that.
> 3. Passes the checkpoint information to the containers from the jobcoordinator
> 4. Modifies the offsetmanger to use the new checkpoint manager and starting 
> offsets from jobcoordinator.
> 
> 
> Tests:
> All existing Unit tests and Zopkio tests pass, I have changed some of 
> existing Unit tests as well.
> Plan to add one more Unit test to verify checkpoint persistence (Stub present 
> in TestJobCoordinator)
> 
> Pending Issues:
> 1. The metrics registry is right now not used correctly, I need to pass the 
> right reference to it
> 2. The coordinator stream does not use the same consumer and producer from 
> the "systemconsumers" and "systemproducers" we have (This will be done after 
> SAMZA-567)
> 3. Remove checkpoint configs related to the stream, migrate them to the 
> coordinator stream and document them.
> 
> 
> Thanks,
> 
> Naveen Somasundaram
> 
>

Reply via email to