> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > 1. Could you update the docs on the JIRA to describe how the checkpointing
> > actually works now?
> > 2. I know this is churn, but I'm not a fan of TaskName/taskName after
> > seeing it in use. I think TaskId/taskId is the way to go (per-Sriram's
> > comments in the JIRA).
> > 3. You refer to "StateLog" everywhere. I think the rest of the code calls
> > this a "Changelog". I think we should stick with changelog.
> > 4. It seems like you have something funny going on with your whitespace. I
> > see { foo => bar}, both (foo:Bar) and (foo: Bar), both foo => bar and foo=>
> > bar, etc. I'm guessing that this is because your formatter would totally
> > nuke Eclipse's. Can you just run your formatting over the changes?
> > 5. Is it ever possible for us to have two different changelog partitions
> > for a single taskName? Even if more than one store is defined for a job?
> > 6. Not crazy about leaking the term SSP to config, logging, or public APIs.
> > You've introduced it all over the place. Prior to this, it wasn't exposed.
> > I'm in favor of avoiding acronyms in favor of clarity wherever possible.
> > 7. It seems like we need a follow-on ticket to introduce some common object
> > that the LocalJobFactory and SamzaAppMasterTaskManager can share for
> > handling task assignment. Right now, a lot of this is in Util
> > (getTaskNameToStateLogPartitionMapping, resolveTaskNameToPartitionMapping)
> > 8. Lots of tests needed.
1. Will do.
2. I'm still pining for Cohort, but can live with TaskName. TaskId to me means
a number, so I'm reluctant to go that way.
3. Fixed.
4. Fixed as I saw them.
5. No, that should not happen.
6. Removed all of them I could find that are public facing.
7. Agreed.
8. There are tests for all the new code (except the
TaskNameToChageLogPartitionMapping returning an empty map). 95% of the code is
a refactor and I've been re-running the unit tests to check for regressions
with every change.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java, line 36
> > <https://reviews.apache.org/r/22215/diff/3/?file=623707#file623707line36>
> >
> > Delete this?
done
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java, line 54
> > <https://reviews.apache.org/r/22215/diff/3/?file=623707#file623707line54>
> >
> > I can't find anywhere where this method is used. I think it should be
> > removed, no?
> >
> > If this should be kept, it should be used and javadoc'd.
done
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java, line 58
> > <https://reviews.apache.org/r/22215/diff/3/?file=623707#file623707line58>
> >
> > I can't find anywhere where this method is used. I think it should be
> > removed, no?
> >
> > If this should be kept, it should be used and javadoc'd.
done
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java,
> > line 55
> > <https://reviews.apache.org/r/22215/diff/3/?file=623708#file623708line55>
> >
> > This is kafka-specific documentation. A file-based checkpoint manager
> > has no log, for example.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java,
> > line 57
> > <https://reviews.apache.org/r/22215/diff/3/?file=623708#file623708line57>
> >
> > Seems like it should return an empty map if there were no messages.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java,
> > line 61
> > <https://reviews.apache.org/r/22215/diff/3/?file=623708#file623708line61>
> >
> > Javadoc.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java,
> > line 66
> > <https://reviews.apache.org/r/22215/diff/3/?file=623708#file623708line66>
> >
> > Recommend removing. There is nothing that says a checkpoint manager
> > must have a log. I think this should just be done in the existing start()
> > method for checkpoint managers that require some initialization.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java,
> > line 27
> > <https://reviews.apache.org/r/22215/diff/3/?file=623711#file623711line27>
> >
> > getSystemStreamPartitionGrouper
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/container/TaskName.java, line 21
> > <https://reviews.apache.org/r/22215/diff/3/?file=623712#file623712line21>
> >
> > Javadoc.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java, line 39
> > <https://reviews.apache.org/r/22215/diff/3/?file=623713#file623713line39>
> >
> > Can we just have setSystemStreamPartitions here?
done
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala,
> > line 33
> > <https://reviews.apache.org/r/22215/diff/3/?file=623715#file623715line33>
> >
> > Per-offline discussion, it'd be good to have this fixed up again.
Will do in a follow up iteration.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala,
> > line 158
> > <https://reviews.apache.org/r/22215/diff/3/?file=623716#file623716line158>
> >
> > Is this necessary? If so, should be a TODO. Else remove.
Done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala,
> > line 161
> > <https://reviews.apache.org/r/22215/diff/3/?file=623716#file623716line161>
> >
> > What do you think about just register(taskName,
> > Set[SystemStreamPartition]) here? I think we have this pattern in some
> > other places.
Done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala,
> > line 205
> > <https://reviews.apache.org/r/22215/diff/3/?file=623716#file623716line205>
> >
> > getOrElse(..., throw new SamzaException())
I don't understand this comment.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala,
> > line 283
> > <https://reviews.apache.org/r/22215/diff/3/?file=623716#file623716line283>
> >
> > Do we need all of this? We import scala.collection.JavaConversions._
> > above
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala, line 37
> > <https://reviews.apache.org/r/22215/diff/3/?file=623718#file623718line37>
> >
> > Not a fan of leaking ssp into user facing interfaces and config names.
> >
> > job.partition.grouper.factory?
> > job.grouper.factory?
> > job.partitioner.factory?
changd to job.systemstreampartition.grouper.factory
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala, line 53
> > <https://reviews.apache.org/r/22215/diff/3/?file=623718#file623718line53>
> >
> > Unless my eyes are getting messed up, aren't these two methods
> > identical?
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala,
> > line 31
> > <https://reviews.apache.org/r/22215/diff/3/?file=623719#file623719line31>
> >
> > I think I had a previous comment on this. Maybe I'm contradicting my
> > previous statement, but this env name is confusing. I think it should be
> > ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala, line 52
> > <https://reviews.apache.org/r/22215/diff/3/?file=623720#file623720line52>
> >
> > The complexity of this code bothers me. I don't like the amount of
> > abbreviated acronyms combined with the functional coding combined with the
> > weird spacing (some spaces after colons/some not, some spaces next to
> > parens/some not, some brackets/some parens).
Expanded all the abbreviations, made spacing consistent.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala, line 37
> > <https://reviews.apache.org/r/22215/diff/3/?file=623720#file623720line37>
> >
> > Can we just have a Set[TaskInstance] instead, since TaskInstance has
> > taskName in it?
We do multiple look-ups based on the taskname here, so we'd have to change
those to filter through the set to find that particular taskname...
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala, line 90
> > <https://reviews.apache.org/r/22215/diff/3/?file=623720#file623720line90>
> >
> > This newline bugs me.
I think I found the one you're talking about...
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala,
> > line 75
> > <https://reviews.apache.org/r/22215/diff/3/?file=623721#file623721line75>
> >
> > Delete newline.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala,
> > line 92
> > <https://reviews.apache.org/r/22215/diff/3/?file=623721#file623721line92>
> >
> > This is complicated and hard to read. I know the rest of this class is
> > kind of a mess, but I want to be mindful not to make it worse.
> >
> > 1. deserialize vs decode.
> > 2. more comments.
> > 3. inconsistent variable naming patterns btwn state log variables and
> > ssp variables (taskNameToSSPMapping?).
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala,
> > line 399
> > <https://reviews.apache.org/r/22215/diff/3/?file=623721#file623721line399>
> >
> > Can we yank the new Partition() stuff into a separate line?
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala,
> > line 426
> > <https://reviews.apache.org/r/22215/diff/3/?file=623721#file623721line426>
> >
> > Can we yank the new Partition() stuff into a separate line?
done
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala,
> > line 437
> > <https://reviews.apache.org/r/22215/diff/3/?file=623721#file623721line437>
> >
> > Can we re-use the new Partition() from above?
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala,
> > line 439
> > <https://reviews.apache.org/r/22215/diff/3/?file=623721#file623721line439>
> >
> > More descriptive exception message.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala,
> > line 441
> > <https://reviews.apache.org/r/22215/diff/3/?file=623721#file623721line441>
> >
> > Before this was SystemStream, but now it's SystemStreamPartitions. I
> > think it should be "Retrieved SystemStreamPartitions..."
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala,
> > line 47
> > <https://reviews.apache.org/r/22215/diff/3/?file=623723#file623723line47>
> >
> > Can we just do ._ instead?
Done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala,
> > line 51
> > <https://reviews.apache.org/r/22215/diff/3/?file=623723#file623723line51>
> >
> > Not really a fan of this. The only place this is used is in the
> > RunLoop, and the run loop actually already has the TaskName:TaskInstance
> > map itself. I'd rather have one or the other, but not both. I mentioned
> > this above, as well. My personal preference is no val here, and keep the
> > map in the RunLoop. On top of this, I think you can use
> > taskInstance.context.getTaskName.
This value is used 28 times within the class. I'm not sure I follow. Let's
talk offline.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala,
> > line 60
> > <https://reviews.apache.org/r/22215/diff/3/?file=623723#file623723line60>
> >
> > Is this used anywhere outside of TaskInstance?
Yes, it's used by the RunLoop to build the map of SSP to TaskInstances.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala,
> > line 24
> > <https://reviews.apache.org/r/22215/diff/3/?file=623725#file623725line24>
> >
> > Javadocs.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupByPartition.scala,
> > line 28
> > <https://reviews.apache.org/r/22215/diff/3/?file=623726#file623726line28>
> >
> > Javadoc.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupBySystemStreamPartition.scala,
> > line 28
> > <https://reviews.apache.org/r/22215/diff/3/?file=623727#file623727line28>
> >
> > Javadoc.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala,
> > line 32
> > <https://reviews.apache.org/r/22215/diff/3/?file=623729#file623729line32>
> >
> > Serialize vs. encode inconsistency bugs me.
all serialize now.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala,
> > line 33
> > <https://reviews.apache.org/r/22215/diff/3/?file=623729#file623729line33>
> >
> > Can you just do .mapValues(_.toInt)?
No, both is necessary to go from Java-type map to Scala-type map. Scala won't
autoconvert the contents of the map (the int), so both steps are necessary.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala,
> > line 37
> > <https://reviews.apache.org/r/22215/diff/3/?file=623730#file623730line37>
> >
> > What about "local-container"?
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala,
> > line 52
> > <https://reviews.apache.org/r/22215/diff/3/?file=623730#file623730line52>
> >
> > Can we do mapValues(Integer.valueOf) here?
That would lose the original mapping, which we need...
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala,
> > line 85
> > <https://reviews.apache.org/r/22215/diff/3/?file=623730#file623730line85>
> >
> > Remove white space.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala,
> > line 39
> > <https://reviews.apache.org/r/22215/diff/3/?file=623731#file623731line39>
> >
> > Yeah, this class is kind of a mess. I think that we should refactor
> > this in a separate ticket. Could you open a follow on JIRA? We can discuss
> > the best way to do so there.
will do.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala,
> > line 52
> > <https://reviews.apache.org/r/22215/diff/3/?file=623731#file623731line52>
> >
> > Some docs about the checkpoint format would be helpful.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala,
> > line 69
> > <https://reviews.apache.org/r/22215/diff/3/?file=623731#file623731line69>
> >
> > Why not:
> >
> > [
> > {
> > "system": "kafka",
> > "stream": "foo",
> > "partition": 4,
> > "offset": 1234
> > },
> > ...
> > ]
> >
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala,
> > line 95
> > <https://reviews.apache.org/r/22215/diff/3/?file=623731#file623731line95>
> >
> > putAll?
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala,
> > line 98
> > <https://reviews.apache.org/r/22215/diff/3/?file=623731#file623731line98>
> >
> > Delete empty line.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 117
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line117>
> >
> > You mean which taskNames go to which containers?
Done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 122
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line122>
> >
> > This method is confusingly named. Shouldn't it be like
> > assignTaskNamesToContainers?
> >
> > Can you move this to TaskNamesToSystemStreamPartitions.apply()? The
> > pattern I've been moving toward is to have wiring be done as apply()
> > methods in companion objects. See DefaultChooser as an example. The idea is
> > that it keeps SamzaContainer.apply from getting any worse, and keeps the
> > wiring close to the class that the wiring is instantiating.
This isn't used by TNTSSP directly, it's used in command builders. At this
point the TNTSSPs are already created...
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 130
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line130>
> >
> > choorts. SystemStreamPartition groups?
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 141
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line141>
> >
> > Can't we just do TaskNamesToSystemStreamPartitions(groups) here?
Not sure I understand. The sspTaskNamesAsJava is defined by the closure that
ends in groups, in order to not spew a bunch of local, intermediate variables
into the method and to more easily delineate the work. That's then what's fed
to the TNTSSP...
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 201
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line201>
> >
> > What do you think about moving all four of these methods
> > (serde/deserde/encode/decode) into a ShellCommandBuilder companion object?
Done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 225
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line225>
> >
> > serialize instead of encode. Name could be shorter, too.
Done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 233
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line233>
> >
> > deserialize instead of decode. Name could be shorter, too.
Done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 249
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line249>
> >
> > For Kafka, do we have to run a topic partition expansion on the
> > changelog partition count in order for this to work?
Only if they add new partitions, which is the same behavior as now.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 296
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line296>
> >
> > Javadocs.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 305
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line305>
> >
> > null for metrics registry? This seems dangerous, and breaks our
> > existing pattern.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 313
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line313>
> >
> > white space.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 319
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line319>
> >
> > white space.
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 331
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line331>
> >
> > What if newMapping == null?
fixed by change from above to semantics.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala,
> > line 61
> > <https://reviews.apache.org/r/22215/diff/3/?file=623749#file623749line61>
> >
> > Add a note about the topic being a single partition
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala,
> > line 88
> > <https://reviews.apache.org/r/22215/diff/3/?file=623749#file623749line88>
> >
> > Put these in a companion object and use javadocs to document
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala,
> > line 268
> > <https://reviews.apache.org/r/22215/diff/3/?file=623749#file623749line268>
> >
> > "taskName to checkpoint mapping"
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala,
> > line 271
> > <https://reviews.apache.org/r/22215/diff/3/?file=623749#file623749line271>
> >
> > "Already existing checkpoint mapping"
done.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala,
> > line 301
> > <https://reviews.apache.org/r/22215/diff/3/?file=623749#file623749line301>
> >
> > Given that you're reading through the entire checkpoint topic for
> > partition 0, we should to set a smaller segment size and enable log
> > compaction here. The smaller segment size will allow more of the topic to
> > be compacted, once compaction is enabled. This should drastically speed up
> > container startup time.
Should that be done here or by the SREs/Ops? Users may wish to keep a larger
log for various reasons...
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala,
> > line 96
> > <https://reviews.apache.org/r/22215/diff/3/?file=623750#file623750line96>
> >
> > Maybe we should change the prefix (add a version number?) so that we
> > don't accidentally use this new checkpoint manager with a legacy topic?
added version number. Great idea.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala,
> > line 77
> > <https://reviews.apache.org/r/22215/diff/3/?file=623758#file623758line77>
> >
> > Seems like some of this logic could be extracted out into a single
> > logic that we can share with the LocalJobFactory (and future job thingies
> > like Mesos). Maybe a follow on ticket?
Yeah, there's definite work to be done here.
> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala,
> > line 121
> > <https://reviews.apache.org/r/22215/diff/3/?file=623758#file623758line121>
> >
> > mapValues?
No, we need the full map, as above.
- Jakob
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22215/#review47416
-----------------------------------------------------------
On July 7, 2014, 4:18 p.m., Jakob Homan wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/22215/
> -----------------------------------------------------------
>
> (Updated July 7, 2014, 4:18 p.m.)
>
>
> Review request for samza.
>
>
> Bugs: SAMZA-123
> https://issues.apache.org/jira/browse/SAMZA-123
>
>
> Repository: samza
>
>
> Description
> -------
>
> Move topic partition grouping to the AM and generalize
>
>
> Diffs
> -----
>
> .gitignore db9d3ec
> samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java 6fad1fa
> samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
> a6e1ba6
>
> samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
> 78d56a9
>
> samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
> PRE-CREATION
>
> samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
> PRE-CREATION
> samza-api/src/main/java/org/apache/samza/container/TaskName.java
> PRE-CREATION
> samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java cb40092
> samza-api/src/main/java/org/apache/samza/task/TaskContext.java 7c1b085
> samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
> 5735a39
> samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
> 9487b58
>
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
> 364e489
> samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala fcafe83
> samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
> 4c2d365
> samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4ca340c
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
> 356adbb
>
> samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
> PRE-CREATION
> samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
> 99a9841
>
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
> 7502124
>
> samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
> PRE-CREATION
>
> samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupByPartition.scala
> PRE-CREATION
>
> samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupBySystemStreamPartition.scala
> PRE-CREATION
>
> samza-core/src/main/scala/org/apache/samza/container/ssp/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
> PRE-CREATION
> samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
> f8865b1
> samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
> e20e7c1
>
> samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
> 3d0a484
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
> 7214151
> samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
> 4ccd604
> samza-core/src/main/scala/org/apache/samza/util/Util.scala 11c23d0
>
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
> bc54f9e
>
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
> 94f6f4c
>
> samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
> 50d9a05
>
> samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
> PRE-CREATION
> samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
> fa10231
>
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
> 190bdfe
> samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
> 1f5e3bb
>
> samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupByPartition.scala
> PRE-CREATION
>
> samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupBySystemStreamPartition.scala
> PRE-CREATION
> samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 21d8a78
> samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
> 4f7ddcd
>
> samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
> 70d8c80
>
> samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
> 12f1e03
> samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala a67ecdf
>
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
> 15245d4
>
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
> cb6dbdf
>
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
> 92ac61e
>
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
> 6be9732
>
> samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
> dae3c2c
>
> samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
> 222c130
>
> samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
> 0077af0
>
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
> dc44a99
> samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b2faebf
>
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
> 01a2683
>
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
> eb1ff54
>
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
> 520f784
>
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
> f1139f5
>
> Diff: https://reviews.apache.org/r/22215/diff/
>
>
> Testing
> -------
>
> Existing and new unit. Now moving on to function.
>
>
> Thanks,
>
> Jakob Homan
>
>