-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28016/#review61445
-----------------------------------------------------------


Nit: space after //

Just to make code more uniform.


docs/learn/documentation/versioned/jobs/configuration-table.html
<https://reviews.apache.org/r/28016/#comment103025>

    Do we really need this config? What's the use case where you'd want to set 
the changelog partition count to something other than the correct size (which 
is automatically determined in the container)?



samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
<https://reviews.apache.org/r/28016/#comment103024>

    Should use "stream" instead of "topic" in this API. Samza doesn't use 
"topic".
    
    I think this method should be createChangelogSream(String streamName, int 
partitions). The reason for keeping it changelog-specific is that we might want 
custom configs for changelog streams (e.g. custom replication count, segment 
size in Kafka, etc). If we don't make the API specific to the changelog, 
there's no way to have custom configs for changelogs (since we won't know if 
the topic being created is a changelog or not).
    
    Also, the pattern that I'm following for SAMZA-448 is to add a new 
interface for create*Stream methods in the SystemAdmin is to have an interface 
that extends SystemAdmin with just the create method in it (e.g. 
ChangelogSystemAdmin). Then, only KafkaSystemAdmin would implement the new 
interface, whereas the FileReaderSystemAdmin would not. This was based on Yan's 
feedback.



samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
<https://reviews.apache.org/r/28016/#comment103026>

    Don't need this if you have a ChangelogSystemAdmin



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103039>

    Remove. Move getChangeLogOldestOffsetsForPartition  into TaskStorageManager.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103030>

    Not needed.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103036>

    Prefer = Map() over null.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103040>

    createStreams



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103034>

    No need for :Unit =



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103031>

    Remove newlines.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103033>

    Message in SamzaException.
    
    space after ,



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
<https://reviews.apache.org/r/28016/#comment103032>

    Indentation.



samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103043>

    Don't need if we hav a ChangelogSystemAdmin.



samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
<https://reviews.apache.org/r/28016/#comment103044>

    Don't need if we hav a ChangelogSystemAdmin.



samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
<https://reviews.apache.org/r/28016/#comment103048>

    I don't think we need this, do we? Is there ever a case where you'd want to 
set the changelog partition count to a non-default number?



samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
<https://reviews.apache.org/r/28016/#comment103045>

    stores.%s.changelog.kafka
    
    (typo on changelog, and kafkaprops -> kafka)



samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
<https://reviews.apache.org/r/28016/#comment103047>

    map -> foreach



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103054>

    Javadocs.
    
    After seeing the implementation, I prefer not passing a config object into 
the constructor, and just having a constructor with a lot of defaults (as we 
had before). The reason that I don't like this is:
    
    1. It's impossible to know what to set in the config to create the 
KafkaSystemAdmin without reading its code.
    2. It bleeds wiring with the class itself. These are two separate concerns, 
and should be treated separately.
    3. It makes writing tests harder, since you have to create a config object, 
rather than just passing in the params.
    
    Recommend switching back to the old style, and just adding new params to 
the constructor.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103057>

    Javadocs.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103056>

    Javadocs.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103065>

    This code looks to be pretty much copied from KafkaCheckpointManager. Can 
we just have the code once, and call it in both places? Maybe in KafkaUtil?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103061>

    Don't think we need this. Should just use numKafkaChangeLogPartitions.
    
    Also, we should try and standardize on "Changelog" not "ChangeLog".



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103062>

    checkpoint topic -> changelog topic



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103063>

    Checkpoint -> changelog.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103067>

    This looks to be copied from KafkaCheckpointManager as well. Can we 
converge on one util method in KafkaUtil, and call from both spots?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103068>

    Checkpoint again.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/28016/#comment103059>

    What is this? Javadocs.



samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
<https://reviews.apache.org/r/28016/#comment103069>

    Not needed if we have ChangelogSystemAdmin.



samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
<https://reviews.apache.org/r/28016/#comment103071>

    Remove.


- Chris Riccomini


On Nov. 13, 2014, 11:58 p.m., Naveen Somasundaram wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/28016/
> -----------------------------------------------------------
> 
> (Updated Nov. 13, 2014, 11:58 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> I have added an new method to the system admin as discussed in the jira, the 
> task storage manager fetches all the information necessary and creates the 
> change log topic using the system admin.
> 
> PENDING: I have to update the Samza docs with the new configurations added, 
> will update the rb with docs updates
> 
> 
> Diffs
> -----
> 
>   docs/learn/documentation/versioned/jobs/configuration-table.html 
> 4266a137ae003e946e11c122d94061c31d643c77 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 571c60631357ea9a0b4fa24e7253008619ef2f32 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  38e313f3c39454110efd354e6ca025869fa930cd 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> d91d6d7940bd07a145dd3b782a9239f24bb5cf2e 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> b8719c36c2b9346bcd3f291e23b33d2c00cebfa9 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  98e92bc12f3e2827cdec02f1ce94d7e2314e4b4e 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> a79eccaa8fc18d197b77f9363f1814fefc4ac40d 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 9fc1f56d4404ec7722c0d34fde2804e981b41309 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  5ac33ea36da451250655d9dd373692b964322b41 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  4ed5e881031e019d8df6de259cabb658820a3ba0 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  5ceb1093a66cb57e298d4b3ccdd24845dbb41b58 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> fa1d51b290013a3913d64884dc43907a76670849 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  118f5eee22016db3b802c32fb26c5d72fa61f1a7 
> 
> Diff: https://reviews.apache.org/r/28016/diff/
> 
> 
> Testing
> -------
> 
> Modoified TestStatefulTask to disable auto creation of topics and the test 
> seems to work.
> 
> 
> Thanks,
> 
> Naveen Somasundaram
> 
>

Reply via email to