> On July 8, 2014, 3:44 a.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala, 
> > line 205
> > <https://reviews.apache.org/r/22215/diff/3/?file=623716#file623716line205>
> >
> >     getOrElse(..., throw new SamzaException())
> 
> Jakob Homan wrote:
>     I don't understand this comment.

This is an unprotected get that will throw a useless exception if the SSP isn't 
in the map. I usually do a getOrElse, and throw a SamzaException with a 
slightly more useful exception message (e.g. "Missing SystemStreamPartition in 
Map blah blah. Please configure Foo properly.")


> On July 8, 2014, 3:44 a.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, 
> > line 51
> > <https://reviews.apache.org/r/22215/diff/3/?file=623723#file623723line51>
> >
> >     Not really a fan of this. The only place this is used is in the 
> > RunLoop, and the run loop actually already has the TaskName:TaskInstance 
> > map itself. I'd rather have one or the other, but not both. I mentioned 
> > this above, as well. My personal preference is no val here, and keep the 
> > map in the RunLoop. On top of this, I think you can use 
> > taskInstance.context.getTaskName.
> 
> Jakob Homan wrote:
>     This value is used 28 times within the class. I'm not sure I follow.  
> Let's talk offline.

I mean the "val" part (making it public).


> On July 8, 2014, 3:44 a.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 122
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line122>
> >
> >     This method is confusingly named. Shouldn't it be like 
> > assignTaskNamesToContainers?
> >     
> >     Can you move this to TaskNamesToSystemStreamPartitions.apply()? The 
> > pattern I've been moving toward is to have wiring be done as apply() 
> > methods in companion objects. See DefaultChooser as an example. The idea is 
> > that it keeps SamzaContainer.apply from getting any worse, and keeps the 
> > wiring close to the class that the wiring is instantiating.
> 
> Jakob Homan wrote:
>     This isn't used by TNTSSP directly, it's used in command builders.  At 
> this point the TNTSSPs are already created...

Yea, I am thinking of it as a complex way to instantiate a TNTSSP. The same way 
that DefaultChooser.apply isn't used by DefaultChooser. It's used to construct 
the thing.


> On July 8, 2014, 3:44 a.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 141
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line141>
> >
> >     Can't we just do TaskNamesToSystemStreamPartitions(groups) here?
> 
> Jakob Homan wrote:
>     Not sure I understand.  The sspTaskNamesAsJava is defined by the closure 
> that ends in groups, in order to not spew a bunch of local, intermediate 
> variables into the method and to more easily delineate the work.  That's then 
> what's fed to the TNTSSP...

Yea, I'm just saying that you return groups, and then immediately wrap it in 
the TNTSSP. I was suggesting just doing the wrapping on the last return line to 
remove one line of code.


> On July 8, 2014, 3:44 a.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 249
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line249>
> >
> >     For Kafka, do we have to run a topic partition expansion on the 
> > changelog partition count in order for this to work?
> 
> Jakob Homan wrote:
>     Only if they add new partitions, which is the same behavior as now.

I see. And that'd be manual, right? Can you open a follow-on ticket to make it 
automated?


> On July 8, 2014, 3:44 a.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala,
> >  line 301
> > <https://reviews.apache.org/r/22215/diff/3/?file=623749#file623749line301>
> >
> >     Given that you're reading through the entire checkpoint topic for 
> > partition 0, we should to set a smaller segment size and enable log 
> > compaction here. The smaller segment size will allow more of the topic to 
> > be compacted, once compaction is enabled. This should drastically speed up 
> > container startup time.
> 
> Jakob Homan wrote:
>     Should that be done here or by the SREs/Ops? Users may wish to keep a 
> larger log for various reasons...

I think it should be done here, but as a follow-on ticket. I think that we need 
to automate as much of this as possible (checkpoint log and state log 
creation). Otherwise, it's just unusable.


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22215/#review47416
-----------------------------------------------------------


On July 10, 2014, 5:09 a.m., Jakob Homan wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/22215/
> -----------------------------------------------------------
> 
> (Updated July 10, 2014, 5:09 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-123
>     https://issues.apache.org/jira/browse/SAMZA-123
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Move topic partition grouping to the AM and generalize
> 
> 
> Diffs
> -----
> 
>   .gitignore db9d3ec 
>   samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java 6fad1fa 
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> a6e1ba6 
>   
> samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java 
> 78d56a9 
>   
> samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
>  PRE-CREATION 
>   
> samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
>  PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/container/TaskName.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java cb40092 
>   samza-api/src/main/java/org/apache/samza/task/TaskContext.java 7c1b085 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
> 5735a39 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 9487b58 
>   
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
>  364e489 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala fcafe83 
>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
> 4c2d365 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4ca340c 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 356adbb 
>   
> samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> 99a9841 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  7502124 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala 
> f8865b1 
>   samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala 
> e20e7c1 
>   
> samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala 
> 3d0a484 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 7214151 
>   samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala 
> 4ccd604 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 11c23d0 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
>  bc54f9e 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 94f6f4c 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
>  50d9a05 
>   
> samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
>  PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> fa10231 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 190bdfe 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 1f5e3bb 
>   
> samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala
>  PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 21d8a78 
>   
> samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala 
> 4f7ddcd 
>   
> samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
>  70d8c80 
>   
> samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala 
> 12f1e03 
>   samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala a67ecdf 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  15245d4 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  cb6dbdf 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  92ac61e 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  6be9732 
>   
> samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
>  72562cf 
>   
> samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java 
> 222c130 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
>  0077af0 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  dc44a99 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b2faebf 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> 01a2683 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
>  eb1ff54 
>   
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
>  520f784 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  f1139f5 
> 
> Diff: https://reviews.apache.org/r/22215/diff/
> 
> 
> Testing
> -------
> 
> Existing and new unit.  Now moving on to function.
> 
> 
> Thanks,
> 
> Jakob Homan
> 
>

Reply via email to