-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22215/#review44817
-----------------------------------------------------------


I'm not wild about SSP* for class names. Thus far the convention has been 
SystemStreamPartition*. I'd rather have it all one way, or all the other. If 
the gripe is that the SystemStreamPartition name is too long, that's being 
discussed in another ticket (rename to Stream/StreamPartition), and I believe 
you had said you had a completely alternative naming idea as well. Can we just 
stick with the existing class name convention, and discuss the naming issue 
separately?

The taskName: String style feels weird to me. First, does it make sense to use 
a class instead of a string? That was the point of using Partition initially, 
instead of just an int. Second, it seems to me that taskId is better than 
taskName now, after seeing it in code. I think either Sriram or Jay was pushing 
this name as well. What do you think about a TaskId class instead of a 
Partition class?

I'm not a fan of the way we handle the taskName/partition mapping in the 
checkpoint manager. Can we just put the mapping in the checkpoint itself, 
rather than having distinct messages for it?

All new classes need docs (e.g. SSPTaskNameGrouper, GroupByPartition, etc). 
It'd also be nice to have some docs on the website as well. Maybe in the 
container section.


build.gradle
<https://reviews.apache.org/r/22215/#comment79359>

    Can we just move SSPGrouperTestBase to samza-core instead? Szczepan says 
that this method of pulling in test source is not supported, and hacky, so I'd 
like to limit it as much as possible.
    
    The recommended alternative is apparently to create a samza-test submodule 
that contains all of the test code. Since we already have a samza-test 
submodule that's really integration tests, we'd have to either move the 
samza-test stuff to an integration test submodule, or create a second 
samza-test module (to avoid circular dependencies: samza-core -> samza-test -> 
samza-core).
    
    To me, easiest fix seems to be to just move the one class to samza-core.



check
<https://reviews.apache.org/r/22215/#comment79357>

    Add license header.
    
    Add brief docs describing what this is for.



check
<https://reviews.apache.org/r/22215/#comment79358>

    Add eclipse.



samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
<https://reviews.apache.org/r/22215/#comment79373>

    This all seems a bit hacky. It'd be better if we could have this passed in 
through the constructor, and not have mutable methods.



samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
<https://reviews.apache.org/r/22215/#comment79360>

    Is this code-gen'd? Wan't to make sure we're not manually writing equals() 
since it's error-prone.



samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
<https://reviews.apache.org/r/22215/#comment79361>

    Is this code-gen'd? Wan't to make sure we're not manually writing 
hashCode() since it's error-prone.



samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/22215/#comment79362>

    specified partition -> specified task



samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/22215/#comment79364>

    Why do we ned this method if we can get the taskName to partition mapping 
from checkpoints? Is this just a convenience method to read the mapping across 
all taskNames?
    
    If so, I'd rather keep this out of the interface, and just provide a Util 
method to handle this.



samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/22215/#comment79365>

    Why do we ned this method if we can set the taskName to partition mapping 
for each checkpoint? Is this just a convenience method to write the mapping 
once for all taskNames, since it's static?
    
    If so, I'd rather keep this out of the interface, and just provide a Util 
method to handle this.



samza-api/src/main/java/org/apache/samza/container/SSPGrouperFactory.java
<https://reviews.apache.org/r/22215/#comment79366>

    Docs are cute at the expense of readability. Can you just be direct here?



samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
<https://reviews.apache.org/r/22215/#comment79367>

    taskNames?
    
    Seems redundant to call it taskNameKeys since taskNames are keys by 
definition, right?



samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
<https://reviews.apache.org/r/22215/#comment79368>

    lost the "set" verb in method name here.



samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
<https://reviews.apache.org/r/22215/#comment79369>

    I'm a little confused here. Isn't the mapping a mapping from taskName to a 
set of partitions? Why is the type a string here?



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/22215/#comment79370>

    I'm assuming this is still to-be-implemented. I think you mentioned this in 
the JIRA.



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
<https://reviews.apache.org/r/22215/#comment79372>

    Can we just import scala.collection._ to make this a little more succinct?



samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
<https://reviews.apache.org/r/22215/#comment79377>

    I'm confused. These configs are named the same thing. Is this intentional? 
Seems like maybe SSP_TASK_NAME_GROUPER_FACTORY should be deleted. It's unused.



samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
<https://reviews.apache.org/r/22215/#comment79401>

    Just so I'm clear, this is only used for the state store mapping? If a job 
doesn't use state stores, this env variable doesn't really get used?



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment79399>

    Can we break all of this into a separate method in the SamzaContainer 
object just to try and keep apply() no worse than it was before. The method is 
already kind of a mess, so I'm trying to not make it worse.



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment79400>

    This log line is a little dated. There is no longer a partition manager. A 
more accurate statement would be that the SystemAdmin wasn't able to find any 
partitions for input streams.



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment79404>

    Do we need the asJavaCollection stuff? We're importing JavaConversions._ 
already



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment79407>

    Do we need the type here? Seems redundant.



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
<https://reviews.apache.org/r/22215/#comment79409>

    Any way to make this a little cleaner/more succinct? Totally a nit, but 
these long scala.* calls just bug me. If you feel strongly about it, I'm ok the 
way it is, but just voicing my preference.



samza-core/src/main/scala/org/apache/samza/container/TaskNameToSSPs.scala
<https://reviews.apache.org/r/22215/#comment79413>

    Oh dear. I feel like the better thing to do here is just to have a custom 
object that doesn't have to deal with all the Scala junk.



samza-core/src/main/scala/org/apache/samza/container/TaskNameToSSPs.scala
<https://reviews.apache.org/r/22215/#comment79416>

    This should be plural (TaskNamesToSSPs) since its a map from multiple 
taskNames to multiple SSPs.



samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupInNSets.scala
<https://reviews.apache.org/r/22215/#comment79415>

    Can we just use the same pattern we've been using here? Either job.num.sets 
or task.num.sets or something?



samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
<https://reviews.apache.org/r/22215/#comment79417>

    I think you've renamed it because it clashes with the new "taskName" 
nomenclature. I don't think this is a jobName, though, right? Isn't it more of 
a containerName?



samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
<https://reviews.apache.org/r/22215/#comment79418>

    I am having trouble following this code. I don't really have any good 
suggestions, but I think it could maybe use a little more cleanup. Maybe I'm in 
the minority, though. Anyway... I'm confused. :)



samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
<https://reviews.apache.org/r/22215/#comment79420>

    Clean up.



samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment79427>

    I think I'm confused. We can assign multiple taskNames to the same task? I 
thought we had a 1:1 between TaskInstance -> taskName -> SSPs?



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment79429>

    Skipping review of this class. I think I need a walkthrough to understand 
what changes are being made here.



samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
<https://reviews.apache.org/r/22215/#comment79432>

    Delete or uncomment. Not sure what you're going for here.



samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
<https://reviews.apache.org/r/22215/#comment79437>

    Still confused. I thought task:taskName mapping was 1:1. Probably 
forgetting things. Can you explain this to me again?



samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
<https://reviews.apache.org/r/22215/#comment79438>

    Not to excited to delete all of these tests without corresponding tests for 
taskNames.


- Chris Riccomini


On June 3, 2014, 7:29 p.m., Jakob Homan wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/22215/
> -----------------------------------------------------------
> 
> (Updated June 3, 2014, 7:29 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-123
>     https://issues.apache.org/jira/browse/SAMZA-123
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Move topic partition grouping to the AM and generalize
> 
> 
> Diffs
> -----
> 
>   build.gradle 1a1db16 
>   check PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java dcf81bf 
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> 34f50fd 
>   samza-api/src/main/java/org/apache/samza/container/SSPGrouper.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/container/SSPGrouperFactory.java 
> PRE-CREATION 
>   
> samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java 
> 5aa7a8f 
>   samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java 5ec6433 
>   samza-api/src/main/java/org/apache/samza/task/TaskContext.java 611507e 
>   samza-api/src/test/java/org/apache/samza/container/SSPGrouperTestBase.java 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
> 5735a39 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 9487b58 
>   
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
>  364e489 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala fcafe83 
>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
> 4c2d365 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4ca340c 
>   
> samza-core/src/main/scala/org/apache/samza/container/SSPTaskNameGrouper.scala 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 7ca8af6 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> 99a9841 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  7502124 
>   samza-core/src/main/scala/org/apache/samza/container/TaskNameToSSPs.scala 
> PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupByPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupBySSP.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupInNSets.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/ssp/taskname/groupers/SimpleSSPTaskNameGrouper.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala 
> f8865b1 
>   samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala 
> e20e7c1 
>   
> samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala 
> 2ed8d7d 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 7214151 
>   samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala 
> 4ccd604 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 1b548fd 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
>  bc54f9e 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 552f8c2 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
>  50d9a05 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> fa10231 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 190bdfe 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 1f5e3bb 
>   
> samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupByPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupBySSP.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupInNSets.scala
>  PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 21d8a78 
>   
> samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
>  70d8c80 
>   
> samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala 
> 12f1e03 
>   samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b8c369b 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  62c91e8 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  cb6dbdf 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  92ac61e 
>   
> samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
>  dae3c2c 
>   
> samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java 
> 222c130 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
>  5b9b926 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  10502a9 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
> c28c9a6 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> 01a2683 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
>  eb1ff54 
>   
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
>  17a96f0 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  0442580 
> 
> Diff: https://reviews.apache.org/r/22215/diff/
> 
> 
> Testing
> -------
> 
> Existing and new unit.  Now moving on to function.
> 
> 
> Thanks,
> 
> Jakob Homan
> 
>

Reply via email to