----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28016/#review61445 -----------------------------------------------------------
Nit: space after // Just to make code more uniform. docs/learn/documentation/versioned/jobs/configuration-table.html <https://reviews.apache.org/r/28016/#comment103025> Do we really need this config? What's the use case where you'd want to set the changelog partition count to something other than the correct size (which is automatically determined in the container)? samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java <https://reviews.apache.org/r/28016/#comment103024> Should use "stream" instead of "topic" in this API. Samza doesn't use "topic". I think this method should be createChangelogSream(String streamName, int partitions). The reason for keeping it changelog-specific is that we might want custom configs for changelog streams (e.g. custom replication count, segment size in Kafka, etc). If we don't make the API specific to the changelog, there's no way to have custom configs for changelogs (since we won't know if the topic being created is a changelog or not). Also, the pattern that I'm following for SAMZA-448 is to add a new interface for create*Stream methods in the SystemAdmin is to have an interface that extends SystemAdmin with just the create method in it (e.g. ChangelogSystemAdmin). Then, only KafkaSystemAdmin would implement the new interface, whereas the FileReaderSystemAdmin would not. This was based on Yan's feedback. samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java <https://reviews.apache.org/r/28016/#comment103026> Don't need this if you have a ChangelogSystemAdmin samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala <https://reviews.apache.org/r/28016/#comment103039> Remove. Move getChangeLogOldestOffsetsForPartition into TaskStorageManager. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala <https://reviews.apache.org/r/28016/#comment103030> Not needed. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala <https://reviews.apache.org/r/28016/#comment103036> Prefer = Map() over null. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala <https://reviews.apache.org/r/28016/#comment103040> createStreams samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala <https://reviews.apache.org/r/28016/#comment103034> No need for :Unit = samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala <https://reviews.apache.org/r/28016/#comment103031> Remove newlines. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala <https://reviews.apache.org/r/28016/#comment103033> Message in SamzaException. space after , samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala <https://reviews.apache.org/r/28016/#comment103032> Indentation. samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment103043> Don't need if we hav a ChangelogSystemAdmin. samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala <https://reviews.apache.org/r/28016/#comment103044> Don't need if we hav a ChangelogSystemAdmin. samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala <https://reviews.apache.org/r/28016/#comment103048> I don't think we need this, do we? Is there ever a case where you'd want to set the changelog partition count to a non-default number? samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala <https://reviews.apache.org/r/28016/#comment103045> stores.%s.changelog.kafka (typo on changelog, and kafkaprops -> kafka) samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala <https://reviews.apache.org/r/28016/#comment103047> map -> foreach samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment103054> Javadocs. After seeing the implementation, I prefer not passing a config object into the constructor, and just having a constructor with a lot of defaults (as we had before). The reason that I don't like this is: 1. It's impossible to know what to set in the config to create the KafkaSystemAdmin without reading its code. 2. It bleeds wiring with the class itself. These are two separate concerns, and should be treated separately. 3. It makes writing tests harder, since you have to create a config object, rather than just passing in the params. Recommend switching back to the old style, and just adding new params to the constructor. samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment103057> Javadocs. samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment103056> Javadocs. samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment103065> This code looks to be pretty much copied from KafkaCheckpointManager. Can we just have the code once, and call it in both places? Maybe in KafkaUtil? samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment103061> Don't think we need this. Should just use numKafkaChangeLogPartitions. Also, we should try and standardize on "Changelog" not "ChangeLog". samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment103062> checkpoint topic -> changelog topic samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment103063> Checkpoint -> changelog. samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment103067> This looks to be copied from KafkaCheckpointManager as well. Can we converge on one util method in KafkaUtil, and call from both spots? samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment103068> Checkpoint again. samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment103059> What is this? Javadocs. samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java <https://reviews.apache.org/r/28016/#comment103069> Not needed if we have ChangelogSystemAdmin. samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala <https://reviews.apache.org/r/28016/#comment103071> Remove. - Chris Riccomini On Nov. 13, 2014, 11:58 p.m., Naveen Somasundaram wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/28016/ > ----------------------------------------------------------- > > (Updated Nov. 13, 2014, 11:58 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > I have added an new method to the system admin as discussed in the jira, the > task storage manager fetches all the information necessary and creates the > change log topic using the system admin. > > PENDING: I have to update the Samza docs with the new configurations added, > will update the rb with docs updates > > > Diffs > ----- > > docs/learn/documentation/versioned/jobs/configuration-table.html > 4266a137ae003e946e11c122d94061c31d643c77 > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java > 571c60631357ea9a0b4fa24e7253008619ef2f32 > > samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java > 38e313f3c39454110efd354e6ca025869fa930cd > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > d91d6d7940bd07a145dd3b782a9239f24bb5cf2e > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala > b8719c36c2b9346bcd3f291e23b33d2c00cebfa9 > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala > 98e92bc12f3e2827cdec02f1ce94d7e2314e4b4e > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > a79eccaa8fc18d197b77f9363f1814fefc4ac40d > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > 9fc1f56d4404ec7722c0d34fde2804e981b41309 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > 5ac33ea36da451250655d9dd373692b964322b41 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > 4ed5e881031e019d8df6de259cabb658820a3ba0 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 5ceb1093a66cb57e298d4b3ccdd24845dbb41b58 > samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java > fa1d51b290013a3913d64884dc43907a76670849 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > 118f5eee22016db3b802c32fb26c5d72fa61f1a7 > > Diff: https://reviews.apache.org/r/28016/diff/ > > > Testing > ------- > > Modoified TestStatefulTask to disable auto creation of topics and the test > seems to work. > > > Thanks, > > Naveen Somasundaram > >
