-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22215/#review47575
-----------------------------------------------------------


Update website docs SamzaContainer, Streams, and Checkpointing sections.


samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/22215/#comment83621>

    We can't commit the code like this. Either remove or fix. Last we spoke, I 
think you were saying fixing shouldn't be too bad with new changes.



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment83622>

    state log -> changelog



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment83623>

    Rather than have a def that defines and calls two defs, it seems simpler to 
just have:
    
    SamzaContainer.buildTaskNameToSystemStreamPartition
    SamzaContainer.buildTaskNameToChangeLogPartitionMapping
    
    And have SamzaContainer.main call them. Also, would prefer parameterizing 
these two methods, and have SamzaContainer.main pass the environment variables 
in. Would make them more testable.



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment83624>

    SSP -> SystemStreamPartition



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83633>

    This is pretty ugly. You don't do this for the taskName key. Can we either 
rename this variable and copy the same pattern for the taskName key, or 
eliminate this variable and hard code "type" in the map?
    
    e.g.
    
            val key = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE, "taskName" 
-> taskName.getTaskName)
    



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83626>

    White space.



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83630>

    Should we put a version number in either the key, or the checkpoint object 
itself? Version number in key will affect compaction. Not sure if this is good 
or not. Should think through.



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83638>

    Import scala.collection and just have mutable.Map here



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83627>

    White space.



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83628>

    White space.



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83637>

    Import scala.collection and just have mutable.Map here



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83642>

    Remove newlines here.



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83629>

    I think we call readLastCheckpoint once per taskName. This method 
invocation seems to always result in a readCheckpointsFromLog() call. If we 
have 100s (or 1000s) of taskNames, won't this result in us re-reading the 
entire checkpoint log 100s or 1000s of times, even though the result will 
always be the same (on job start)?



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83644>

    This whole method seems like a total copy and paste of 
readCheckpointsFromLog. Can we just have one method that does both? They both 
do the exact same thing, but just grab different messages from the log and add 
them to different maps.



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83643>

    Remove newlines.



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83647>

    Same deal here. This is a copy and paste of writeCheckpoint. There's like 2 
lines of difference, plus some verbiage in the log line. Can you find a way to 
merge this logic? A lot of the complexity is in handling Kafka failures, which 
should be shared logic between the two methods.


- Chris Riccomini


On July 10, 2014, 5:09 a.m., Jakob Homan wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/22215/
> -----------------------------------------------------------
> 
> (Updated July 10, 2014, 5:09 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-123
>     https://issues.apache.org/jira/browse/SAMZA-123
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Move topic partition grouping to the AM and generalize
> 
> 
> Diffs
> -----
> 
>   .gitignore db9d3ec 
>   samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java 6fad1fa 
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> a6e1ba6 
>   
> samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java 
> 78d56a9 
>   
> samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
>  PRE-CREATION 
>   
> samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
>  PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/container/TaskName.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java cb40092 
>   samza-api/src/main/java/org/apache/samza/task/TaskContext.java 7c1b085 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
> 5735a39 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 9487b58 
>   
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
>  364e489 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala fcafe83 
>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
> 4c2d365 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4ca340c 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 356adbb 
>   
> samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> 99a9841 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  7502124 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala 
> f8865b1 
>   samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala 
> e20e7c1 
>   
> samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala 
> 3d0a484 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 7214151 
>   samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala 
> 4ccd604 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 11c23d0 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
>  bc54f9e 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 94f6f4c 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
>  50d9a05 
>   
> samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
>  PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> fa10231 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 190bdfe 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 1f5e3bb 
>   
> samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala
>  PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 21d8a78 
>   
> samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala 
> 4f7ddcd 
>   
> samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
>  70d8c80 
>   
> samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala 
> 12f1e03 
>   samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala a67ecdf 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  15245d4 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  cb6dbdf 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  92ac61e 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  6be9732 
>   
> samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
>  72562cf 
>   
> samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java 
> 222c130 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
>  0077af0 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  dc44a99 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b2faebf 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> 01a2683 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
>  eb1ff54 
>   
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
>  520f784 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  f1139f5 
> 
> Diff: https://reviews.apache.org/r/22215/diff/
> 
> 
> Testing
> -------
> 
> Existing and new unit.  Now moving on to function.
> 
> 
> Thanks,
> 
> Jakob Homan
> 
>

Reply via email to