----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28016/#review62262 -----------------------------------------------------------
samza-api/src/main/java/org/apache/samza/config/Config.java <https://reviews.apache.org/r/28016/#comment104273> Done! samza-api/src/main/java/org/apache/samza/config/Config.java <https://reviews.apache.org/r/28016/#comment104276> done samza-api/src/main/java/org/apache/samza/config/Config.java <https://reviews.apache.org/r/28016/#comment104277> done samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java <https://reviews.apache.org/r/28016/#comment104278> done! samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java <https://reviews.apache.org/r/28016/#comment104280> done samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/28016/#comment104282> I didn't even realize it was in a loop, once again, I've been tricked by .map! Moved it outside the loop. samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/28016/#comment104283> Done samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/28016/#comment104284> I removed it, but I wasn't sure why it is there for other parameters. I just added to make it consistent. Removed for now. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala <https://reviews.apache.org/r/28016/#comment104287> Done samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala <https://reviews.apache.org/r/28016/#comment104289> Done samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala <https://reviews.apache.org/r/28016/#comment104290> Removed %s, it was redundent samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala <https://reviews.apache.org/r/28016/#comment104291> done! samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment104292> Single new line ? Fixed samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala <https://reviews.apache.org/r/28016/#comment104293> done samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala <https://reviews.apache.org/r/28016/#comment104332> Done! samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala <https://reviews.apache.org/r/28016/#comment104333> Fixed to verify against the factory name samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala <https://reviews.apache.org/r/28016/#comment104334> done samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala <https://reviews.apache.org/r/28016/#comment104335> done samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala <https://reviews.apache.org/r/28016/#comment104336> done samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala <https://reviews.apache.org/r/28016/#comment104337> done samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment104362> done samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment104363> done samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment104364> Done samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment104365> Done samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment104366> Done samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment104367> Done samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment104368> I thought about it a little bit more and tried out abstracting it, but couldn't figure out a clean way to do it. The way to do this would be pass a validateTopic(topicName, function_that_validates_the_condition). Then the validationTopic will check the functionToValidate in the retryLoop, but the retryLoop also takes an exception condition which will check for the exceptions to be rethrown. If we want to use validateTopic to be shared, then we've two options: 1. Catch both KafkaChangeLogException and kafkaCheckpointException and rethrow both of them (which kind of not correct because changelog creation can throw checkpoint exception and we would just stop retrying - but should not, or the other way around). This is not a strictly correct behavior. 2. Change validateTopic to validateTopic(topicName, function_that_validates_the_condition, exceptions_to_ignore) - this is more so the correct approach, but I'm not sure what we gain by abstracting to this level, because it almost looks like the retry loop itself (with signature (function_that_validates_the_condition, exceptions_to_handle). I'm going to leave it as-is, unless you can think of a better abstraction :) samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment104369> removed! samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment104370> Removed! samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/28016/#comment104371> Done! samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala <https://reviews.apache.org/r/28016/#comment104372> switched to var samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala <https://reviews.apache.org/r/28016/#comment104373> done samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala <https://reviews.apache.org/r/28016/#comment104374> Done samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala <https://reviews.apache.org/r/28016/#comment104375> removed, this must have been a copy-paste error(Source of all evil). There is another copy of the same variable above. - Naveen Somasundaram On Nov. 19, 2014, 2:12 a.m., Naveen Somasundaram wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/28016/ > ----------------------------------------------------------- > > (Updated Nov. 19, 2014, 2:12 a.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > I have added an new method to the system admin as discussed in the jira, the > task storage manager fetches all the information necessary and creates the > change log topic using the system admin. > > PENDING: I have to update the Samza docs with the new configurations added, > will update the rb with docs updates > > > Diffs > ----- > > .reviewboardrc 9339119e248e41f954d47e1d01a0f2e1130d349c > docs/learn/documentation/versioned/jobs/configuration-table.html > 4266a137ae003e946e11c122d94061c31d643c77 > samza-api/src/main/java/org/apache/samza/config/Config.java > 2048e90e80e21086eb59be57f3bcd5ebf92b2811 > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java > 571c60631357ea9a0b4fa24e7253008619ef2f32 > > samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java > 38e313f3c39454110efd354e6ca025869fa930cd > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > d91d6d7940bd07a145dd3b782a9239f24bb5cf2e > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala > b8719c36c2b9346bcd3f291e23b33d2c00cebfa9 > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala > 98e92bc12f3e2827cdec02f1ce94d7e2314e4b4e > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > a79eccaa8fc18d197b77f9363f1814fefc4ac40d > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > 9fc1f56d4404ec7722c0d34fde2804e981b41309 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > 5ac33ea36da451250655d9dd373692b964322b41 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > 4ed5e881031e019d8df6de259cabb658820a3ba0 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 5ceb1093a66cb57e298d4b3ccdd24845dbb41b58 > samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java > fa1d51b290013a3913d64884dc43907a76670849 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > 118f5eee22016db3b802c32fb26c5d72fa61f1a7 > > Diff: https://reviews.apache.org/r/28016/diff/ > > > Testing > ------- > > Modoified TestStatefulTask to disable auto creation of topics and the test > seems to work. > > > Thanks, > > Naveen Somasundaram > >
