----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/52476/#review153478 -----------------------------------------------------------
Looks pretty good, few final comments. samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java (line 253) <https://reviews.apache.org/r/52476/#comment222805> Does jobConfig.getChangeLog...() (implicit conversion) not work? samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 57) <https://reviews.apache.org/r/52476/#comment222811> getChangeLogDeleteRetentionsInMs samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 58) <https://reviews.apache.org/r/52476/#comment222812> See if you can use the named operator instead of the symbolic operator. I think you might be able to .toMap on the list of pairs. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 104) <https://reviews.apache.org/r/52476/#comment222827> Indent by 4 (or whatever continuation indent is) samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 106) <https://reviews.apache.org/r/52476/#comment222821> If loggedStoreDir isn't present we put null into fileOffset. If that's the expected behavior, let's log at info in isStateLoggedStore if (!loggedStoreDir.exists()) samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 108) <https://reviews.apache.org/r/52476/#comment222815> Misleading comment, could be the other condition too. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 111) <https://reviews.apache.org/r/52476/#comment222818> Will be useful to log at info the read file offset here (or in readOffsetFile) samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 112) <https://reviews.apache.org/r/52476/#comment222813> Indent by 4 (or whatever continuation indent is) samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 118) <https://reviews.apache.org/r/52476/#comment222819> Add method description. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 130) <https://reviews.apache.org/r/52476/#comment222824> First %s should be store name. Add another %s at the end for loggedStoreDir. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 133) <https://reviews.apache.org/r/52476/#comment222822> log both last modified time and delete retention ms values too. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 133) <https://reviews.apache.org/r/52476/#comment222823> Log both last modified time and delete retention ms values too. samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 182) <https://reviews.apache.org/r/52476/#comment222826> s/partition/logged storage partition to be consistent with next message. samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala (line 144) <https://reviews.apache.org/r/52476/#comment222830> implicit conversion should probably work. - Prateek Maheshwari On Oct. 19, 2016, 3:04 p.m., Shanthoosh Venkataraman wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/52476/ > ----------------------------------------------------------- > > (Updated Oct. 19, 2016, 3:04 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > Every local task store is backed up by a kafka changelog topic. Due to log > compaction, delete tombstones of the changelog topic have a ttl of > delete.retention.ms. Replaying the events from the changelog that has missing > delete tombstones, would result in creation of an inconsistent local > store(due to the missing of some delete events). This patch deletes the local > stores in which difference between current time and last modified time of the > offset file is greater than delete.retention.ms during the container startup. > > > Diffs > ----- > > samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java > 9329edf7d724f3a0d9235354bb77936f713e3b5f > samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala > be3f1068f7921c9c8f8697577efea6580672813e > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 05a996c98075ea8ed3767af666b9beeb1933f2a6 > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala > 0b7bcdda1639eea8239a69c31bdf42558e9077d2 > > samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala > 4d40f520e54beb643acd8410c772b75e2f6a9162 > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > 973ab8cfb3d248bec7efe5e338f5e667f097556d > > Diff: https://reviews.apache.org/r/52476/diff/ > > > Testing > ------- > > Unit testing and manual testing has been done to verify the functionality. > > > Thanks, > > Shanthoosh Venkataraman > >