-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153478
-----------------------------------------------------------



Looks pretty good, few final comments.


samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java (line 
253)
<https://reviews.apache.org/r/52476/#comment222805>

    Does jobConfig.getChangeLog...() (implicit conversion) not work?



samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 57)
<https://reviews.apache.org/r/52476/#comment222811>

    getChangeLogDeleteRetentionsInMs



samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 58)
<https://reviews.apache.org/r/52476/#comment222812>

    See if you can use the named operator instead of the symbolic operator.
    
    I think you might be able to .toMap on the list of pairs.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 104)
<https://reviews.apache.org/r/52476/#comment222827>

    Indent by 4 (or whatever continuation indent is)



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 106)
<https://reviews.apache.org/r/52476/#comment222821>

    If loggedStoreDir isn't present we put null into fileOffset. If that's the 
expected behavior, let's log at info in isStateLoggedStore if 
(!loggedStoreDir.exists())



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 108)
<https://reviews.apache.org/r/52476/#comment222815>

    Misleading comment, could be the other condition too.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 111)
<https://reviews.apache.org/r/52476/#comment222818>

    Will be useful to log at info the read file offset  here (or in 
readOffsetFile)



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 112)
<https://reviews.apache.org/r/52476/#comment222813>

    Indent by 4 (or whatever continuation indent is)



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 118)
<https://reviews.apache.org/r/52476/#comment222819>

    Add method description.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 130)
<https://reviews.apache.org/r/52476/#comment222824>

    First %s should be store name. Add another %s at the end for loggedStoreDir.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 133)
<https://reviews.apache.org/r/52476/#comment222822>

    log both last modified time and delete retention ms values too.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 133)
<https://reviews.apache.org/r/52476/#comment222823>

    Log both last modified time and delete retention ms values too.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 182)
<https://reviews.apache.org/r/52476/#comment222826>

    s/partition/logged storage partition to be consistent with next message.



samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala (line 144)
<https://reviews.apache.org/r/52476/#comment222830>

    implicit conversion should probably work.


- Prateek Maheshwari


On Oct. 19, 2016, 3:04 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 19, 2016, 3:04 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>

Reply via email to