> On Nov. 14, 2014, 5:32 p.m., Chris Riccomini wrote:
> > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java, line 59
> > <https://reviews.apache.org/r/28016/diff/1/?file=762999#file762999line59>
> >
> >     Should use "stream" instead of "topic" in this API. Samza doesn't use 
> > "topic".
> >     
> >     I think this method should be createChangelogSream(String streamName, 
> > int partitions). The reason for keeping it changelog-specific is that we 
> > might want custom configs for changelog streams (e.g. custom replication 
> > count, segment size in Kafka, etc). If we don't make the API specific to 
> > the changelog, there's no way to have custom configs for changelogs (since 
> > we won't know if the topic being created is a changelog or not).
> >     
> >     Also, the pattern that I'm following for SAMZA-448 is to add a new 
> > interface for create*Stream methods in the SystemAdmin is to have an 
> > interface that extends SystemAdmin with just the create method in it (e.g. 
> > ChangelogSystemAdmin). Then, only KafkaSystemAdmin would implement the new 
> > interface, whereas the FileReaderSystemAdmin would not. This was based on 
> > Yan's feedback.
> 
> Naveen Somasundaram wrote:
>     Makes sense, if we have more and more stream creations coming up, then we 
> can maybe generalize it by having a custom configuration object for this API 
> and refactor our code (with resonable defaults). But as of now, we don't have 
> a necessity (we probably have 2-3 stream creation needs). So I am not going 
> to try to code for the future :). I'll change it to changelogstream specific 
> function.
>     
>     The latter, as we discussed, I am not going to make the change to have a 
> separate interface. While I agree, every interface implementer will have to 
> deal with it (even though they are not interested in it), the only 
> alternative to the proposed solution from 448 is for the caller to deal with 
> it (check if the instance is an appropriate instance before using it), which 
> makes it slighly harder to use the interface IMO. So I am still inclined to 
> sticking with this implementation.

There's a useful distinction here: some systems are able to implement a 
changelog, and some are not. For example, Amazon Kinesis currently only 
supports time-based retention of messages for 1 day, and doesn't have log 
compaction, so it's not suitable for changelog streams. Now what happens if you 
try to use a system that's not changelog-compatible with a changelog? With the 
current implementation, assuming that systems which don't support changelogs 
throw an exception on createChangelogStream(), you'll get an exception at job 
startup.

A sub-interface could allow slightly more graceful handling of this situation 
(since an early config verification step could detect whether the appropriate 
sub-interface is implemented, without calling the method).


- Martin


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28016/#review61445
-----------------------------------------------------------


On Nov. 20, 2014, 9:56 p.m., Naveen Somasundaram wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/28016/
> -----------------------------------------------------------
> 
> (Updated Nov. 20, 2014, 9:56 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> I have added an new method to the system admin as discussed in the jira, the 
> task storage manager fetches all the information necessary and creates the 
> change log topic using the system admin.
> 
> PENDING: I have to update the Samza docs with the new configurations added, 
> will update the rb with docs updates
> 
> 
> Diffs
> -----
> 
>   .reviewboardrc 9339119e248e41f954d47e1d01a0f2e1130d349c 
>   docs/learn/documentation/versioned/jobs/configuration-table.html 
> 4266a137ae003e946e11c122d94061c31d643c77 
>   samza-api/src/main/java/org/apache/samza/config/Config.java 
> 2048e90e80e21086eb59be57f3bcd5ebf92b2811 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
> 571c60631357ea9a0b4fa24e7253008619ef2f32 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  38e313f3c39454110efd354e6ca025869fa930cd 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> d91d6d7940bd07a145dd3b782a9239f24bb5cf2e 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> b8719c36c2b9346bcd3f291e23b33d2c00cebfa9 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  98e92bc12f3e2827cdec02f1ce94d7e2314e4b4e 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> a79eccaa8fc18d197b77f9363f1814fefc4ac40d 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 9fc1f56d4404ec7722c0d34fde2804e981b41309 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  5ac33ea36da451250655d9dd373692b964322b41 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  4ed5e881031e019d8df6de259cabb658820a3ba0 
>   samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala 
> d660b91fb7a1029a47d5e083759b8971ad97e617 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  5ceb1093a66cb57e298d4b3ccdd24845dbb41b58 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> fa1d51b290013a3913d64884dc43907a76670849 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  118f5eee22016db3b802c32fb26c5d72fa61f1a7 
> 
> Diff: https://reviews.apache.org/r/28016/diff/
> 
> 
> Testing
> -------
> 
> Modoified TestStatefulTask to disable auto creation of topics and the test 
> seems to work.
> 
> 
> Thanks,
> 
> Naveen Somasundaram
> 
>

Reply via email to