-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22215/#review47416
-----------------------------------------------------------
1. Could you update the docs on the JIRA to describe how the checkpointing
actually works now?
2. I know this is churn, but I'm not a fan of TaskName/taskName after seeing it
in use. I think TaskId/taskId is the way to go (per-Sriram's comments in the
JIRA).
3. You refer to "StateLog" everywhere. I think the rest of the code calls this
a "Changelog". I think we should stick with changelog.
4. It seems like you have something funny going on with your whitespace. I see
{ foo => bar}, both (foo:Bar) and (foo: Bar), both foo => bar and foo=> bar,
etc. I'm guessing that this is because your formatter would totally nuke
Eclipse's. Can you just run your formatting over the changes?
5. Is it ever possible for us to have two different changelog partitions for a
single taskName? Even if more than one store is defined for a job?
6. Not crazy about leaking the term SSP to config, logging, or public APIs.
You've introduced it all over the place. Prior to this, it wasn't exposed. I'm
in favor of avoiding acronyms in favor of clarity wherever possible.
7. It seems like we need a follow-on ticket to introduce some common object
that the LocalJobFactory and SamzaAppMasterTaskManager can share for handling
task assignment. Right now, a lot of this is in Util
(getTaskNameToStateLogPartitionMapping, resolveTaskNameToPartitionMapping)
8. Lots of tests needed.
samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
<https://reviews.apache.org/r/22215/#comment83161>
Delete this?
samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
<https://reviews.apache.org/r/22215/#comment83157>
I can't find anywhere where this method is used. I think it should be
removed, no?
If this should be kept, it should be used and javadoc'd.
samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
<https://reviews.apache.org/r/22215/#comment83158>
I can't find anywhere where this method is used. I think it should be
removed, no?
If this should be kept, it should be used and javadoc'd.
samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/22215/#comment83164>
This is kafka-specific documentation. A file-based checkpoint manager has
no log, for example.
samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/22215/#comment83180>
Seems like it should return an empty map if there were no messages.
samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/22215/#comment83165>
Javadoc.
samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/22215/#comment83163>
Recommend removing. There is nothing that says a checkpoint manager must
have a log. I think this should just be done in the existing start() method for
checkpoint managers that require some initialization.
samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
<https://reviews.apache.org/r/22215/#comment83166>
getSystemStreamPartitionGrouper
samza-api/src/main/java/org/apache/samza/container/TaskName.java
<https://reviews.apache.org/r/22215/#comment83167>
Javadoc.
samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
<https://reviews.apache.org/r/22215/#comment83168>
Can we just have setSystemStreamPartitions here?
samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
<https://reviews.apache.org/r/22215/#comment83169>
Can we just have setStateLogPartitions here?
samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/22215/#comment83170>
Per-offline discussion, it'd be good to have this fixed up again.
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
<https://reviews.apache.org/r/22215/#comment83181>
Is this necessary? If so, should be a TODO. Else remove.
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
<https://reviews.apache.org/r/22215/#comment83207>
What do you think about just register(taskName, Set[SystemStreamPartition])
here? I think we have this pattern in some other places.
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
<https://reviews.apache.org/r/22215/#comment83171>
getOrElse(..., throw new SamzaException())
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
<https://reviews.apache.org/r/22215/#comment83172>
Do we need all of this? We import scala.collection.JavaConversions._ above
samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
<https://reviews.apache.org/r/22215/#comment83183>
Not a fan of leaking ssp into user facing interfaces and config names.
job.partition.grouper.factory?
job.grouper.factory?
job.partitioner.factory?
samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
<https://reviews.apache.org/r/22215/#comment83185>
Unless my eyes are getting messed up, aren't these two methods identical?
samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
<https://reviews.apache.org/r/22215/#comment83184>
I think I had a previous comment on this. Maybe I'm contradicting my
previous statement, but this env name is confusing. I think it should be
ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING
samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
<https://reviews.apache.org/r/22215/#comment83187>
Can we just have a Set[TaskInstance] instead, since TaskInstance has
taskName in it?
samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
<https://reviews.apache.org/r/22215/#comment83186>
The complexity of this code bothers me. I don't like the amount of
abbreviated acronyms combined with the functional coding combined with the
weird spacing (some spaces after colons/some not, some spaces next to
parens/some not, some brackets/some parens).
samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
<https://reviews.apache.org/r/22215/#comment83189>
This newline bugs me.
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment83194>
Delete newline.
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment83195>
This is complicated and hard to read. I know the rest of this class is kind
of a mess, but I want to be mindful not to make it worse.
1. deserialize vs decode.
2. more comments.
3. inconsistent variable naming patterns btwn state log variables and ssp
variables (taskNameToSSPMapping?).
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment83200>
Can we yank the new Partition() stuff into a separate line?
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment83199>
Can we yank the new Partition() stuff into a separate line?
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment83201>
Can we re-use the new Partition() from above?
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment83203>
More descriptive exception message.
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment83202>
Before this was SystemStream, but now it's SystemStreamPartitions. I think
it should be "Retrieved SystemStreamPartitions..."
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
<https://reviews.apache.org/r/22215/#comment83204>
Can we just do ._ instead?
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
<https://reviews.apache.org/r/22215/#comment83206>
Not really a fan of this. The only place this is used is in the RunLoop,
and the run loop actually already has the TaskName:TaskInstance map itself. I'd
rather have one or the other, but not both. I mentioned this above, as well. My
personal preference is no val here, and keep the map in the RunLoop. On top of
this, I think you can use taskInstance.context.getTaskName.
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
<https://reviews.apache.org/r/22215/#comment83205>
Is this used anywhere outside of TaskInstance?
samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
<https://reviews.apache.org/r/22215/#comment83208>
Javadocs.
samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupByPartition.scala
<https://reviews.apache.org/r/22215/#comment83211>
Javadoc.
samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupBySystemStreamPartition.scala
<https://reviews.apache.org/r/22215/#comment83212>
Javadoc.
samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
<https://reviews.apache.org/r/22215/#comment83213>
Serialize vs. encode inconsistency bugs me.
samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
<https://reviews.apache.org/r/22215/#comment83214>
Can you just do .mapValues(_.toInt)?
samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
<https://reviews.apache.org/r/22215/#comment83215>
What about "local-container"?
samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
<https://reviews.apache.org/r/22215/#comment83216>
Can we do mapValues(Integer.valueOf) here?
samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
<https://reviews.apache.org/r/22215/#comment83217>
Remove white space.
samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
<https://reviews.apache.org/r/22215/#comment83218>
Yeah, this class is kind of a mess. I think that we should refactor this in
a separate ticket. Could you open a follow on JIRA? We can discuss the best way
to do so there.
samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
<https://reviews.apache.org/r/22215/#comment83220>
Some docs about the checkpoint format would be helpful.
samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
<https://reviews.apache.org/r/22215/#comment83219>
Why not:
[
{
"system": "kafka",
"stream": "foo",
"partition": 4,
"offset": 1234
},
...
]
samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
<https://reviews.apache.org/r/22215/#comment83221>
putAll?
samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
<https://reviews.apache.org/r/22215/#comment83222>
Delete empty line.
samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment83228>
You mean which taskNames go to which containers?
samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment83229>
This method is confusingly named. Shouldn't it be like
assignTaskNamesToContainers?
Can you move this to TaskNamesToSystemStreamPartitions.apply()? The pattern
I've been moving toward is to have wiring be done as apply() methods in
companion objects. See DefaultChooser as an example. The idea is that it keeps
SamzaContainer.apply from getting any worse, and keeps the wiring close to the
class that the wiring is instantiating.
samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment83230>
choorts. SystemStreamPartition groups?
samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment83232>
Can't we just do TaskNamesToSystemStreamPartitions(groups) here?
samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment83235>
What do you think about moving all four of these methods
(serde/deserde/encode/decode) into a ShellCommandBuilder companion object?
samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment83233>
serialize instead of encode. Name could be shorter, too.
samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment83234>
deserialize instead of decode. Name could be shorter, too.
samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment83236>
For Kafka, do we have to run a topic partition expansion on the changelog
partition count in order for this to work?
samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment83237>
Javadocs.
samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment83238>
null for metrics registry? This seems dangerous, and breaks our existing
pattern.
samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment83240>
white space.
samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment83239>
white space.
samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment83241>
What if newMapping == null?
samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83242>
Add a note about the topic being a single partition
samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83245>
Put these in a companion object and use javadocs to document
samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83243>
"taskName to checkpoint mapping"
samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83244>
"Already existing checkpoint mapping"
samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment83246>
Given that you're reading through the entire checkpoint topic for partition
0, we should to set a smaller segment size and enable log compaction here. The
smaller segment size will allow more of the topic to be compacted, once
compaction is enabled. This should drastically speed up container startup time.
samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
<https://reviews.apache.org/r/22215/#comment83247>
Maybe we should change the prefix (add a version number?) so that we don't
accidentally use this new checkpoint manager with a legacy topic?
samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
<https://reviews.apache.org/r/22215/#comment83248>
Seems like some of this logic could be extracted out into a single logic
that we can share with the LocalJobFactory (and future job thingies like
Mesos). Maybe a follow on ticket?
samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
<https://reviews.apache.org/r/22215/#comment83249>
mapValues?
- Chris Riccomini
On July 7, 2014, 11:18 p.m., Jakob Homan wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/22215/
> -----------------------------------------------------------
>
> (Updated July 7, 2014, 11:18 p.m.)
>
>
> Review request for samza.
>
>
> Bugs: SAMZA-123
> https://issues.apache.org/jira/browse/SAMZA-123
>
>
> Repository: samza
>
>
> Description
> -------
>
> Move topic partition grouping to the AM and generalize
>
>
> Diffs
> -----
>
> .gitignore db9d3ec
> samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java 6fad1fa
> samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
> a6e1ba6
>
> samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
> 78d56a9
>
> samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
> PRE-CREATION
>
> samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
> PRE-CREATION
> samza-api/src/main/java/org/apache/samza/container/TaskName.java
> PRE-CREATION
> samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java cb40092
> samza-api/src/main/java/org/apache/samza/task/TaskContext.java 7c1b085
> samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
> 5735a39
> samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
> 9487b58
>
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
> 364e489
> samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala fcafe83
> samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
> 4c2d365
> samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4ca340c
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
> 356adbb
>
> samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
> PRE-CREATION
> samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
> 99a9841
>
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
> 7502124
>
> samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
> PRE-CREATION
>
> samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupByPartition.scala
> PRE-CREATION
>
> samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupBySystemStreamPartition.scala
> PRE-CREATION
>
> samza-core/src/main/scala/org/apache/samza/container/ssp/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
> PRE-CREATION
> samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
> f8865b1
> samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
> e20e7c1
>
> samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
> 3d0a484
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
> 7214151
> samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
> 4ccd604
> samza-core/src/main/scala/org/apache/samza/util/Util.scala 11c23d0
>
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
> bc54f9e
>
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
> 94f6f4c
>
> samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
> 50d9a05
>
> samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
> PRE-CREATION
> samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
> fa10231
>
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
> 190bdfe
> samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
> 1f5e3bb
>
> samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupByPartition.scala
> PRE-CREATION
>
> samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupBySystemStreamPartition.scala
> PRE-CREATION
> samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 21d8a78
> samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
> 4f7ddcd
>
> samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
> 70d8c80
>
> samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
> 12f1e03
> samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala a67ecdf
>
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
> 15245d4
>
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
> cb6dbdf
>
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
> 92ac61e
>
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
> 6be9732
>
> samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
> dae3c2c
>
> samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
> 222c130
>
> samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
> 0077af0
>
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
> dc44a99
> samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b2faebf
>
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
> 01a2683
>
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
> eb1ff54
>
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
> 520f784
>
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
> f1139f5
>
> Diff: https://reviews.apache.org/r/22215/diff/
>
>
> Testing
> -------
>
> Existing and new unit. Now moving on to function.
>
>
> Thanks,
>
> Jakob Homan
>
>