----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/19593/#review39034 -----------------------------------------------------------
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala <https://reviews.apache.org/r/19593/#comment71425> Is it certain that nextOffsets.size cannot change after start() has been called? (In other words, does the API require that all the calls to register() happen before the call to start()?) Otherwise perPartitionFetchThreshold could be outdated. I also noticed that the version you committed silently defaults fetchThreshold to perPartitionFetchThreshold if nextOffsets.size == 0 (rather than throwing an exception). Would you mind explaining why? Just to improve my understanding of what's going on here. Thanks! - Martin Kleppmann On March 25, 2014, 3:38 a.m., Chris Riccomini wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/19593/ > ----------------------------------------------------------- > > (Updated March 25, 2014, 3:38 a.m.) > > > Review request for samza. > > > Bugs: SAMZA-203 > https://issues.apache.org/jira/browse/SAMZA-203 > > > Repository: samza > > > Description > ------- > > fixing <= nextOfset check > > > re-add .size check > > > add test for fetch threshold > > > double check that nextOffset.size > 0, and add docs to fetchThreshold > > > double check that nextOffset.size > 0, and add docs to fetchThreshold > > > fix whitespace > > > drop sleep ms in broker proxy to 100ms not 1000ms. fix thread name in broker > proxy. bump fetch threshold in kafka system consumer to 50000. use > per-partition fetch thresholds now. catch interrupt exceptions in broker > proxy so we don't pollute stderr when we close a broker proxy. > > > fix kafka config's samza.fetch.threshold punctuation > > > Diffs > ----- > > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > 978620a413e231e8d6c57544b3082d286ea20170 > samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala > bca2f8660c3e5952f8dbde606b6f9b2760fc526f > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala > 8ad97dfde939bd85f8b5db18f0790f30360a9274 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > feecc582e9a52c456aa2bd8830226590986b7ad0 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala > PRE-CREATION > > Diff: https://reviews.apache.org/r/19593/diff/ > > > Testing > ------- > > > Thanks, > > Chris Riccomini > >
