----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/52476/#review152976 -----------------------------------------------------------
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 33) <https://reviews.apache.org/r/52476/#comment222170> Usually more readable if you write this as a multiplication: 1 * 24 * 60 * 60 * 1000L samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 532) <https://reviews.apache.org/r/52476/#comment222189> Prefer passing the one config that we need explicitly instead of passing the config object. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 26) <https://reviews.apache.org/r/52476/#comment222171> Delete or import explicitly. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 29) <https://reviews.apache.org/r/52476/#comment222190> Unrelated to RB but prefer explicit imports. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 71) <https://reviews.apache.org/r/52476/#comment222173> SystemClock exists so that you can pass a "Clock" to your method/class and mock it in tests. Let's either do that (preferred) or use System.currentTimeMillis() directly. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 98) <https://reviews.apache.org/r/52476/#comment222184> Looks like we've updated `fileOffset` in `#readOffsetFile` as a side effect even when the store is stale. Is that what we want here? samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 100) <https://reviews.apache.org/r/52476/#comment222175> Add an INFO message here. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 106) <https://reviews.apache.org/r/52476/#comment222176> Add method description. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 113) <https://reviews.apache.org/r/52476/#comment222177> Another case we ran into on Friday - if the oldest offset in the changelog topic is newer than the offset in the OFFSET file. Do you need to handle that here? Nitpick: would isStaleStore be clearer? samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 118) <https://reviews.apache.org/r/52476/#comment222180> Looks like this is already logged at line 163? samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 119) <https://reviews.apache.org/r/52476/#comment222179> Don't `return` in scala code. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 121) <https://reviews.apache.org/r/52476/#comment222181> Mention somewhere in the message that this means that the store is stale. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 150) <https://reviews.apache.org/r/52476/#comment222178> I'd prefer to split this into two methods - existence check and file read. Would be even nicer if fileOffset was updated explicitly (after staleness checks etc.) and not as a side effect of reading the file. If you don't, let's add return type to method signature. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 155) <https://reviews.apache.org/r/52476/#comment222182> Unrelated, but let's make this info. - Prateek Maheshwari On Oct. 17, 2016, 3:40 p.m., Shanthoosh Venkataraman wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/52476/ > ----------------------------------------------------------- > > (Updated Oct. 17, 2016, 3:40 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > Every local task store is backed up by a kafka changelog topic. Due to log > compaction, delete tombstones of the changelog topic have a ttl of > delete.retention.ms. Replaying the events from the changelog that has missing > delete tombstones, would result in creation of an inconsistent local > store(due to the missing of some delete events). This patch deletes the local > stores in which difference between current time and last modified time of the > offset file is greater than delete.retention.ms during the container startup. > > > Diffs > ----- > > samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java > 9329edf7d724f3a0d9235354bb77936f713e3b5f > samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala > be3f1068f7921c9c8f8697577efea6580672813e > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 05a996c98075ea8ed3767af666b9beeb1933f2a6 > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala > 0b7bcdda1639eea8239a69c31bdf42558e9077d2 > > samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala > 4d40f520e54beb643acd8410c772b75e2f6a9162 > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > 973ab8cfb3d248bec7efe5e338f5e667f097556d > > Diff: https://reviews.apache.org/r/52476/diff/ > > > Testing > ------- > > Unit testing and manual testing has been done to verify the functionality. > > > Thanks, > > Shanthoosh Venkataraman > >