> On July 8, 2014, 3:44 a.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala, > > line 205 > > <https://reviews.apache.org/r/22215/diff/3/?file=623716#file623716line205> > > > > getOrElse(..., throw new SamzaException()) > > Jakob Homan wrote: > I don't understand this comment.
This is an unprotected get that will throw a useless exception if the SSP isn't in the map. I usually do a getOrElse, and throw a SamzaException with a slightly more useful exception message (e.g. "Missing SystemStreamPartition in Map blah blah. Please configure Foo properly.") > On July 8, 2014, 3:44 a.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, > > line 51 > > <https://reviews.apache.org/r/22215/diff/3/?file=623723#file623723line51> > > > > Not really a fan of this. The only place this is used is in the > > RunLoop, and the run loop actually already has the TaskName:TaskInstance > > map itself. I'd rather have one or the other, but not both. I mentioned > > this above, as well. My personal preference is no val here, and keep the > > map in the RunLoop. On top of this, I think you can use > > taskInstance.context.getTaskName. > > Jakob Homan wrote: > This value is used 28 times within the class. I'm not sure I follow. > Let's talk offline. I mean the "val" part (making it public). > On July 8, 2014, 3:44 a.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 122 > > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line122> > > > > This method is confusingly named. Shouldn't it be like > > assignTaskNamesToContainers? > > > > Can you move this to TaskNamesToSystemStreamPartitions.apply()? The > > pattern I've been moving toward is to have wiring be done as apply() > > methods in companion objects. See DefaultChooser as an example. The idea is > > that it keeps SamzaContainer.apply from getting any worse, and keeps the > > wiring close to the class that the wiring is instantiating. > > Jakob Homan wrote: > This isn't used by TNTSSP directly, it's used in command builders. At > this point the TNTSSPs are already created... Yea, I am thinking of it as a complex way to instantiate a TNTSSP. The same way that DefaultChooser.apply isn't used by DefaultChooser. It's used to construct the thing. > On July 8, 2014, 3:44 a.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 141 > > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line141> > > > > Can't we just do TaskNamesToSystemStreamPartitions(groups) here? > > Jakob Homan wrote: > Not sure I understand. The sspTaskNamesAsJava is defined by the closure > that ends in groups, in order to not spew a bunch of local, intermediate > variables into the method and to more easily delineate the work. That's then > what's fed to the TNTSSP... Yea, I'm just saying that you return groups, and then immediately wrap it in the TNTSSP. I was suggesting just doing the wrapping on the last return line to remove one line of code. > On July 8, 2014, 3:44 a.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 249 > > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line249> > > > > For Kafka, do we have to run a topic partition expansion on the > > changelog partition count in order for this to work? > > Jakob Homan wrote: > Only if they add new partitions, which is the same behavior as now. I see. And that'd be manual, right? Can you open a follow-on ticket to make it automated? > On July 8, 2014, 3:44 a.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala, > > line 301 > > <https://reviews.apache.org/r/22215/diff/3/?file=623749#file623749line301> > > > > Given that you're reading through the entire checkpoint topic for > > partition 0, we should to set a smaller segment size and enable log > > compaction here. The smaller segment size will allow more of the topic to > > be compacted, once compaction is enabled. This should drastically speed up > > container startup time. > > Jakob Homan wrote: > Should that be done here or by the SREs/Ops? Users may wish to keep a > larger log for various reasons... I think it should be done here, but as a follow-on ticket. I think that we need to automate as much of this as possible (checkpoint log and state log creation). Otherwise, it's just unusable. - Chris ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/22215/#review47416 ----------------------------------------------------------- On July 10, 2014, 5:09 a.m., Jakob Homan wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/22215/ > ----------------------------------------------------------- > > (Updated July 10, 2014, 5:09 a.m.) > > > Review request for samza. > > > Bugs: SAMZA-123 > https://issues.apache.org/jira/browse/SAMZA-123 > > > Repository: samza > > > Description > ------- > > Move topic partition grouping to the AM and generalize > > > Diffs > ----- > > .gitignore db9d3ec > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java 6fad1fa > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java > a6e1ba6 > > samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java > 78d56a9 > > samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java > PRE-CREATION > > samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java > PRE-CREATION > samza-api/src/main/java/org/apache/samza/container/TaskName.java > PRE-CREATION > samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java cb40092 > samza-api/src/main/java/org/apache/samza/task/TaskContext.java 7c1b085 > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala > 5735a39 > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala > 9487b58 > > samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala > 364e489 > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala fcafe83 > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala > 4c2d365 > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4ca340c > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 356adbb > > samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > 99a9841 > > samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala > 7502124 > > samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala > f8865b1 > samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala > e20e7c1 > > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala > 3d0a484 > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala > 7214151 > samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala > 4ccd604 > samza-core/src/main/scala/org/apache/samza/util/Util.scala 11c23d0 > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala > bc54f9e > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > 94f6f4c > > samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala > 50d9a05 > > samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala > fa10231 > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > 190bdfe > samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala > 1f5e3bb > > samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 21d8a78 > > samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala > 4f7ddcd > > samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala > 70d8c80 > > samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala > 12f1e03 > samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala a67ecdf > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala > 15245d4 > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala > cb6dbdf > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala > 92ac61e > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 6be9732 > > samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala > 72562cf > > samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java > 222c130 > > samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala > 0077af0 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > dc44a99 > samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b2faebf > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala > 01a2683 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala > eb1ff54 > > samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala > 520f784 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > f1139f5 > > Diff: https://reviews.apache.org/r/22215/diff/ > > > Testing > ------- > > Existing and new unit. Now moving on to function. > > > Thanks, > > Jakob Homan > >
