> On March 31, 2014, 10:56 a.m., Martin Kleppmann wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala,
> >  line 94
> > <https://reviews.apache.org/r/19593/diff/4/?file=535014#file535014line94>
> >
> >     Is it certain that nextOffsets.size cannot change after start() has 
> > been called? (In other words, does the API require that all the calls to 
> > register() happen before the call to start()?) Otherwise 
> > perPartitionFetchThreshold could be outdated.
> >     
> >     I also noticed that the version you committed silently defaults 
> > fetchThreshold to perPartitionFetchThreshold if nextOffsets.size == 0 
> > (rather than throwing an exception). Would you mind explaining why? Just to 
> > improve my understanding of what's going on here. Thanks!

nextOffsets should not change, since it is only mutated in .register, which 
must only be called BEFORE start. This is in the contract defined in 
SystemConsumer, "SystemStreamPartitions should only be registered start is 
called." That said, we are not enforcing this anywhere, so it could be mutated 
after start has been called.

I initially had KafkaSystemConsumer throw an exception when next offsets was 
zero. This caused TaskStorageManager to fail in cases where the 
changeLogOldestOffsets value for a restore stream was null (the store was 
empty), since the storage manager only registers partitions with a non-null 
offset, but starts all consumers. Technically, this is a violation of the 
SystemConsumer contract, "Register will be called one or more times before 
start." I made the change without thinking this through, and we should probably 
fix TaskStorageManager to only start consumers with offsets, and change 
KafkaSystemConsumer to throw an exception when start is called with an empty 
nextOffset map. Want to open a JIRA for this?


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/19593/#review39034
-----------------------------------------------------------


On March 25, 2014, 3:38 a.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/19593/
> -----------------------------------------------------------
> 
> (Updated March 25, 2014, 3:38 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-203
>     https://issues.apache.org/jira/browse/SAMZA-203
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> fixing <= nextOfset check
> 
> 
> re-add .size check
> 
> 
> add test for fetch threshold
> 
> 
> double check that nextOffset.size > 0, and add docs to fetchThreshold
> 
> 
> double check that nextOffset.size > 0, and add docs to fetchThreshold
> 
> 
> fix whitespace
> 
> 
> drop sleep ms in broker proxy to 100ms not 1000ms. fix thread name in broker 
> proxy. bump fetch threshold in kafka system consumer to 50000. use 
> per-partition fetch thresholds now. catch interrupt exceptions in broker 
> proxy so we don't pollute stderr when we close a broker proxy.
> 
> 
> fix kafka config's samza.fetch.threshold punctuation
> 
> 
> Diffs
> -----
> 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 978620a413e231e8d6c57544b3082d286ea20170 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
> bca2f8660c3e5952f8dbde606b6f9b2760fc526f 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
>  8ad97dfde939bd85f8b5db18f0790f30360a9274 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  feecc582e9a52c456aa2bd8830226590986b7ad0 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/19593/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>

Reply via email to