> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > 1. Could you update the docs on the JIRA to describe how the checkpointing 
> > actually works now?
> > 2. I know this is churn, but I'm not a fan of TaskName/taskName after 
> > seeing it in use. I think TaskId/taskId is the way to go (per-Sriram's 
> > comments in the JIRA).
> > 3. You refer to "StateLog" everywhere. I think the rest of the code calls 
> > this a "Changelog". I think we should stick with changelog.
> > 4. It seems like you have something funny going on with your whitespace. I 
> > see { foo => bar}, both (foo:Bar) and (foo: Bar), both foo => bar and foo=> 
> > bar, etc. I'm guessing that this is because your formatter would totally 
> > nuke Eclipse's. Can you just run your formatting over the changes?
> > 5. Is it ever possible for us to have two different changelog partitions 
> > for a single taskName? Even if more than one store is defined for a job?
> > 6. Not crazy about leaking the term SSP to config, logging, or public APIs. 
> > You've introduced it all over the place. Prior to this, it wasn't exposed. 
> > I'm in favor of avoiding acronyms in favor of clarity wherever possible.
> > 7. It seems like we need a follow-on ticket to introduce some common object 
> > that the LocalJobFactory and SamzaAppMasterTaskManager can share for 
> > handling task assignment. Right now, a lot of this is in Util 
> > (getTaskNameToStateLogPartitionMapping, resolveTaskNameToPartitionMapping)
> > 8. Lots of tests needed.

1. Will do.
2. I'm still pining for Cohort, but can live with TaskName.  TaskId to me means 
a number, so I'm reluctant to go that way.
3. Fixed.
4. Fixed as I saw them.
5. No, that should not happen.
6. Removed all of them I could find that are public facing.
7. Agreed.
8. There are tests for all the new code (except the 
TaskNameToChageLogPartitionMapping returning an empty map).  95% of the code is 
a refactor and I've been re-running the unit tests to check for regressions 
with every change.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java, line 36
> > <https://reviews.apache.org/r/22215/diff/3/?file=623707#file623707line36>
> >
> >     Delete this?

done


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java, line 54
> > <https://reviews.apache.org/r/22215/diff/3/?file=623707#file623707line54>
> >
> >     I can't find anywhere where this method is used. I think it should be 
> > removed, no?
> >     
> >     If this should be kept, it should be used and javadoc'd.

done


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java, line 58
> > <https://reviews.apache.org/r/22215/diff/3/?file=623707#file623707line58>
> >
> >     I can't find anywhere where this method is used. I think it should be 
> > removed, no?
> >     
> >     If this should be kept, it should be used and javadoc'd.

done


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java, 
> > line 55
> > <https://reviews.apache.org/r/22215/diff/3/?file=623708#file623708line55>
> >
> >     This is kafka-specific documentation. A file-based checkpoint manager 
> > has no log, for example.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java, 
> > line 57
> > <https://reviews.apache.org/r/22215/diff/3/?file=623708#file623708line57>
> >
> >     Seems like it should return an empty map if there were no messages.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java, 
> > line 61
> > <https://reviews.apache.org/r/22215/diff/3/?file=623708#file623708line61>
> >
> >     Javadoc.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java, 
> > line 66
> > <https://reviews.apache.org/r/22215/diff/3/?file=623708#file623708line66>
> >
> >     Recommend removing. There is nothing that says a checkpoint manager 
> > must have a log. I think this should just be done in the existing start() 
> > method for checkpoint managers that require some initialization.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java,
> >  line 27
> > <https://reviews.apache.org/r/22215/diff/3/?file=623711#file623711line27>
> >
> >     getSystemStreamPartitionGrouper

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/container/TaskName.java, line 21
> > <https://reviews.apache.org/r/22215/diff/3/?file=623712#file623712line21>
> >
> >     Javadoc.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java, line 39
> > <https://reviews.apache.org/r/22215/diff/3/?file=623713#file623713line39>
> >
> >     Can we just have setSystemStreamPartitions here?

done


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala, 
> > line 33
> > <https://reviews.apache.org/r/22215/diff/3/?file=623715#file623715line33>
> >
> >     Per-offline discussion, it'd be good to have this fixed up again.

Will do in a follow up iteration.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala, 
> > line 158
> > <https://reviews.apache.org/r/22215/diff/3/?file=623716#file623716line158>
> >
> >     Is this necessary? If so, should be a TODO. Else remove.

Done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala, 
> > line 161
> > <https://reviews.apache.org/r/22215/diff/3/?file=623716#file623716line161>
> >
> >     What do you think about just register(taskName, 
> > Set[SystemStreamPartition]) here? I think we have this pattern in some 
> > other places.

Done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala, 
> > line 205
> > <https://reviews.apache.org/r/22215/diff/3/?file=623716#file623716line205>
> >
> >     getOrElse(..., throw new SamzaException())

I don't understand this comment.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala, 
> > line 283
> > <https://reviews.apache.org/r/22215/diff/3/?file=623716#file623716line283>
> >
> >     Do we need all of this? We import scala.collection.JavaConversions._ 
> > above

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala, line 37
> > <https://reviews.apache.org/r/22215/diff/3/?file=623718#file623718line37>
> >
> >     Not a fan of leaking ssp into user facing interfaces and config names.
> >     
> >     job.partition.grouper.factory?
> >     job.grouper.factory?
> >     job.partitioner.factory?

changd to job.systemstreampartition.grouper.factory


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala, line 53
> > <https://reviews.apache.org/r/22215/diff/3/?file=623718#file623718line53>
> >
> >     Unless my eyes are getting messed up, aren't these two methods 
> > identical?

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala, 
> > line 31
> > <https://reviews.apache.org/r/22215/diff/3/?file=623719#file623719line31>
> >
> >     I think I had a previous comment on this. Maybe I'm contradicting my 
> > previous statement, but this env name is confusing. I think it should be 
> > ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala, line 52
> > <https://reviews.apache.org/r/22215/diff/3/?file=623720#file623720line52>
> >
> >     The complexity of this code bothers me. I don't like the amount of 
> > abbreviated acronyms combined with the functional coding combined with the 
> > weird spacing (some spaces after colons/some not, some spaces next to 
> > parens/some not, some brackets/some parens).

Expanded all the abbreviations, made spacing consistent. 


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala, line 37
> > <https://reviews.apache.org/r/22215/diff/3/?file=623720#file623720line37>
> >
> >     Can we just have a Set[TaskInstance] instead, since TaskInstance has 
> > taskName in it?

We do multiple look-ups based on the taskname here, so we'd have to change 
those to filter through the set to find that particular taskname...


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala, line 90
> > <https://reviews.apache.org/r/22215/diff/3/?file=623720#file623720line90>
> >
> >     This newline bugs me.

I think I found the one you're talking about...


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, 
> > line 75
> > <https://reviews.apache.org/r/22215/diff/3/?file=623721#file623721line75>
> >
> >     Delete newline.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, 
> > line 92
> > <https://reviews.apache.org/r/22215/diff/3/?file=623721#file623721line92>
> >
> >     This is complicated and hard to read. I know the rest of this class is 
> > kind of a mess, but I want to be mindful not to make it worse.
> >     
> >     1. deserialize vs decode.
> >     2. more comments.
> >     3. inconsistent variable naming patterns btwn state log variables and 
> > ssp variables (taskNameToSSPMapping?).

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, 
> > line 399
> > <https://reviews.apache.org/r/22215/diff/3/?file=623721#file623721line399>
> >
> >     Can we yank the new Partition() stuff into a separate line?

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, 
> > line 426
> > <https://reviews.apache.org/r/22215/diff/3/?file=623721#file623721line426>
> >
> >     Can we yank the new Partition() stuff into a separate line?

done


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, 
> > line 437
> > <https://reviews.apache.org/r/22215/diff/3/?file=623721#file623721line437>
> >
> >     Can we re-use the new Partition() from above?

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, 
> > line 439
> > <https://reviews.apache.org/r/22215/diff/3/?file=623721#file623721line439>
> >
> >     More descriptive exception message.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, 
> > line 441
> > <https://reviews.apache.org/r/22215/diff/3/?file=623721#file623721line441>
> >
> >     Before this was SystemStream, but now it's SystemStreamPartitions. I 
> > think it should be "Retrieved SystemStreamPartitions..."

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, 
> > line 47
> > <https://reviews.apache.org/r/22215/diff/3/?file=623723#file623723line47>
> >
> >     Can we just do ._ instead?

Done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, 
> > line 51
> > <https://reviews.apache.org/r/22215/diff/3/?file=623723#file623723line51>
> >
> >     Not really a fan of this. The only place this is used is in the 
> > RunLoop, and the run loop actually already has the TaskName:TaskInstance 
> > map itself. I'd rather have one or the other, but not both. I mentioned 
> > this above, as well. My personal preference is no val here, and keep the 
> > map in the RunLoop. On top of this, I think you can use 
> > taskInstance.context.getTaskName.

This value is used 28 times within the class. I'm not sure I follow.  Let's 
talk offline.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, 
> > line 60
> > <https://reviews.apache.org/r/22215/diff/3/?file=623723#file623723line60>
> >
> >     Is this used anywhere outside of TaskInstance?

Yes, it's used by the RunLoop to build the map of SSP to TaskInstances.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala,
> >  line 24
> > <https://reviews.apache.org/r/22215/diff/3/?file=623725#file623725line24>
> >
> >     Javadocs.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupByPartition.scala,
> >  line 28
> > <https://reviews.apache.org/r/22215/diff/3/?file=623726#file623726line28>
> >
> >     Javadoc.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupBySystemStreamPartition.scala,
> >  line 28
> > <https://reviews.apache.org/r/22215/diff/3/?file=623727#file623727line28>
> >
> >     Javadoc.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala, 
> > line 32
> > <https://reviews.apache.org/r/22215/diff/3/?file=623729#file623729line32>
> >
> >     Serialize vs. encode inconsistency bugs me.

all serialize now.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala, 
> > line 33
> > <https://reviews.apache.org/r/22215/diff/3/?file=623729#file623729line33>
> >
> >     Can you just do .mapValues(_.toInt)?

No, both is necessary to go from Java-type map to Scala-type map.  Scala won't 
autoconvert the contents of the map (the int), so both steps are necessary.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala, 
> > line 37
> > <https://reviews.apache.org/r/22215/diff/3/?file=623730#file623730line37>
> >
> >     What about "local-container"?

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala, 
> > line 52
> > <https://reviews.apache.org/r/22215/diff/3/?file=623730#file623730line52>
> >
> >     Can we do mapValues(Integer.valueOf) here?

That would lose the original mapping, which we need...


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala, 
> > line 85
> > <https://reviews.apache.org/r/22215/diff/3/?file=623730#file623730line85>
> >
> >     Remove white space.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala,
> >  line 39
> > <https://reviews.apache.org/r/22215/diff/3/?file=623731#file623731line39>
> >
> >     Yeah, this class is kind of a mess. I think that we should refactor 
> > this in a separate ticket. Could you open a follow on JIRA? We can discuss 
> > the best way to do so there.

will do.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala,
> >  line 52
> > <https://reviews.apache.org/r/22215/diff/3/?file=623731#file623731line52>
> >
> >     Some docs about the checkpoint format would be helpful.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala,
> >  line 69
> > <https://reviews.apache.org/r/22215/diff/3/?file=623731#file623731line69>
> >
> >     Why not:
> >     
> >     [
> >       {
> >         "system": "kafka",
> >         "stream": "foo",
> >         "partition": 4,
> >         "offset": 1234
> >       },
> >       ...
> >     ]
> >

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala,
> >  line 95
> > <https://reviews.apache.org/r/22215/diff/3/?file=623731#file623731line95>
> >
> >     putAll?

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala,
> >  line 98
> > <https://reviews.apache.org/r/22215/diff/3/?file=623731#file623731line98>
> >
> >     Delete empty line.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 117
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line117>
> >
> >     You mean which taskNames go to which containers?

Done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 122
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line122>
> >
> >     This method is confusingly named. Shouldn't it be like 
> > assignTaskNamesToContainers?
> >     
> >     Can you move this to TaskNamesToSystemStreamPartitions.apply()? The 
> > pattern I've been moving toward is to have wiring be done as apply() 
> > methods in companion objects. See DefaultChooser as an example. The idea is 
> > that it keeps SamzaContainer.apply from getting any worse, and keeps the 
> > wiring close to the class that the wiring is instantiating.

This isn't used by TNTSSP directly, it's used in command builders.  At this 
point the TNTSSPs are already created...


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 130
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line130>
> >
> >     choorts. SystemStreamPartition groups?

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 141
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line141>
> >
> >     Can't we just do TaskNamesToSystemStreamPartitions(groups) here?

Not sure I understand.  The sspTaskNamesAsJava is defined by the closure that 
ends in groups, in order to not spew a bunch of local, intermediate variables 
into the method and to more easily delineate the work.  That's then what's fed 
to the TNTSSP...


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 201
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line201>
> >
> >     What do you think about moving all four of these methods 
> > (serde/deserde/encode/decode) into a ShellCommandBuilder companion object?

Done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 225
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line225>
> >
> >     serialize instead of encode. Name could be shorter, too.

Done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 233
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line233>
> >
> >     deserialize instead of decode. Name could be shorter, too.

Done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 249
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line249>
> >
> >     For Kafka, do we have to run a topic partition expansion on the 
> > changelog partition count in order for this to work?

Only if they add new partitions, which is the same behavior as now.  


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 296
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line296>
> >
> >     Javadocs.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 305
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line305>
> >
> >     null for metrics registry? This seems dangerous, and breaks our 
> > existing pattern.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 313
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line313>
> >
> >     white space.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 319
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line319>
> >
> >     white space.

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 331
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line331>
> >
> >     What if newMapping == null?

fixed by change from above to semantics.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala,
> >  line 61
> > <https://reviews.apache.org/r/22215/diff/3/?file=623749#file623749line61>
> >
> >     Add a note about the topic being a single partition

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala,
> >  line 88
> > <https://reviews.apache.org/r/22215/diff/3/?file=623749#file623749line88>
> >
> >     Put these in a companion object and use javadocs to document

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala,
> >  line 268
> > <https://reviews.apache.org/r/22215/diff/3/?file=623749#file623749line268>
> >
> >     "taskName to checkpoint mapping"

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala,
> >  line 271
> > <https://reviews.apache.org/r/22215/diff/3/?file=623749#file623749line271>
> >
> >     "Already existing checkpoint mapping"

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala,
> >  line 301
> > <https://reviews.apache.org/r/22215/diff/3/?file=623749#file623749line301>
> >
> >     Given that you're reading through the entire checkpoint topic for 
> > partition 0, we should to set a smaller segment size and enable log 
> > compaction here. The smaller segment size will allow more of the topic to 
> > be compacted, once compaction is enabled. This should drastically speed up 
> > container startup time.

Should that be done here or by the SREs/Ops? Users may wish to keep a larger 
log for various reasons...


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala,
> >  line 96
> > <https://reviews.apache.org/r/22215/diff/3/?file=623750#file623750line96>
> >
> >     Maybe we should change the prefix (add a version number?) so that we 
> > don't accidentally use this new checkpoint manager with a legacy topic?

added version number. Great idea.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala,
> >  line 77
> > <https://reviews.apache.org/r/22215/diff/3/?file=623758#file623758line77>
> >
> >     Seems like some of this logic could be extracted out into a single 
> > logic that we can share with the LocalJobFactory (and future job thingies 
> > like Mesos). Maybe a follow on ticket?

Yeah, there's definite work to be done here.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala,
> >  line 121
> > <https://reviews.apache.org/r/22215/diff/3/?file=623758#file623758line121>
> >
> >     mapValues?

No, we need the full map, as above.


- Jakob


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22215/#review47416
-----------------------------------------------------------


On July 7, 2014, 4:18 p.m., Jakob Homan wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/22215/
> -----------------------------------------------------------
> 
> (Updated July 7, 2014, 4:18 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-123
>     https://issues.apache.org/jira/browse/SAMZA-123
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Move topic partition grouping to the AM and generalize
> 
> 
> Diffs
> -----
> 
>   .gitignore db9d3ec 
>   samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java 6fad1fa 
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> a6e1ba6 
>   
> samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java 
> 78d56a9 
>   
> samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
>  PRE-CREATION 
>   
> samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
>  PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/container/TaskName.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java cb40092 
>   samza-api/src/main/java/org/apache/samza/task/TaskContext.java 7c1b085 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
> 5735a39 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 9487b58 
>   
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
>  364e489 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala fcafe83 
>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
> 4c2d365 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4ca340c 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 356adbb 
>   
> samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> 99a9841 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  7502124 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupByPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupBySystemStreamPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/ssp/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala 
> f8865b1 
>   samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala 
> e20e7c1 
>   
> samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala 
> 3d0a484 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 7214151 
>   samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala 
> 4ccd604 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 11c23d0 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
>  bc54f9e 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 94f6f4c 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
>  50d9a05 
>   
> samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
>  PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> fa10231 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 190bdfe 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 1f5e3bb 
>   
> samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupByPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupBySystemStreamPartition.scala
>  PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 21d8a78 
>   samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala 
> 4f7ddcd 
>   
> samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
>  70d8c80 
>   
> samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala 
> 12f1e03 
>   samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala a67ecdf 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  15245d4 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  cb6dbdf 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  92ac61e 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  6be9732 
>   
> samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
>  dae3c2c 
>   
> samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java 
> 222c130 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
>  0077af0 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  dc44a99 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b2faebf 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> 01a2683 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
>  eb1ff54 
>   
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
>  520f784 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  f1139f5 
> 
> Diff: https://reviews.apache.org/r/22215/diff/
> 
> 
> Testing
> -------
> 
> Existing and new unit.  Now moving on to function.
> 
> 
> Thanks,
> 
> Jakob Homan
> 
>

Reply via email to