> On March 31, 2014, 10:56 a.m., Martin Kleppmann wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala, > > line 94 > > <https://reviews.apache.org/r/19593/diff/4/?file=535014#file535014line94> > > > > Is it certain that nextOffsets.size cannot change after start() has > > been called? (In other words, does the API require that all the calls to > > register() happen before the call to start()?) Otherwise > > perPartitionFetchThreshold could be outdated. > > > > I also noticed that the version you committed silently defaults > > fetchThreshold to perPartitionFetchThreshold if nextOffsets.size == 0 > > (rather than throwing an exception). Would you mind explaining why? Just to > > improve my understanding of what's going on here. Thanks!
nextOffsets should not change, since it is only mutated in .register, which must only be called BEFORE start. This is in the contract defined in SystemConsumer, "SystemStreamPartitions should only be registered start is called." That said, we are not enforcing this anywhere, so it could be mutated after start has been called. I initially had KafkaSystemConsumer throw an exception when next offsets was zero. This caused TaskStorageManager to fail in cases where the changeLogOldestOffsets value for a restore stream was null (the store was empty), since the storage manager only registers partitions with a non-null offset, but starts all consumers. Technically, this is a violation of the SystemConsumer contract, "Register will be called one or more times before start." I made the change without thinking this through, and we should probably fix TaskStorageManager to only start consumers with offsets, and change KafkaSystemConsumer to throw an exception when start is called with an empty nextOffset map. Want to open a JIRA for this? - Chris ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/19593/#review39034 ----------------------------------------------------------- On March 25, 2014, 3:38 a.m., Chris Riccomini wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/19593/ > ----------------------------------------------------------- > > (Updated March 25, 2014, 3:38 a.m.) > > > Review request for samza. > > > Bugs: SAMZA-203 > https://issues.apache.org/jira/browse/SAMZA-203 > > > Repository: samza > > > Description > ------- > > fixing <= nextOfset check > > > re-add .size check > > > add test for fetch threshold > > > double check that nextOffset.size > 0, and add docs to fetchThreshold > > > double check that nextOffset.size > 0, and add docs to fetchThreshold > > > fix whitespace > > > drop sleep ms in broker proxy to 100ms not 1000ms. fix thread name in broker > proxy. bump fetch threshold in kafka system consumer to 50000. use > per-partition fetch thresholds now. catch interrupt exceptions in broker > proxy so we don't pollute stderr when we close a broker proxy. > > > fix kafka config's samza.fetch.threshold punctuation > > > Diffs > ----- > > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > 978620a413e231e8d6c57544b3082d286ea20170 > samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala > bca2f8660c3e5952f8dbde606b6f9b2760fc526f > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala > 8ad97dfde939bd85f8b5db18f0790f30360a9274 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > feecc582e9a52c456aa2bd8830226590986b7ad0 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala > PRE-CREATION > > Diff: https://reviews.apache.org/r/19593/diff/ > > > Testing > ------- > > > Thanks, > > Chris Riccomini > >
