> On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote: > > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, > > line 97 > > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line97> > > > > I would find the documentation more helpful if it didn't describe what > > the code does to this variable (I can find that out by reading the code > > itself), but rather *why* it exists and what problem it solves.
I tried to describe more about "why" in the latest patch. > On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote: > > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, > > line 166 > > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line166> > > > > Not clear whether you mean that maxMsgsPerStreamPartition defaults to > > 1000, or refreshThreshold defaults to 1000. Updated to be explicit. > On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote: > > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, > > line 174 > > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line174> > > > > I think this is a slight change of semantics. The previous computation > > returned the number of messages that had not yet been given to the chooser, > > whereas totalUnprocessedMessages is only decremented after the chooser has > > chosen the message. Don't think that's a problem, just wanted to point it > > out in case it wasn't deliberate. Yea, it is. I didn't think it was really a big deal, though. > On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote: > > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, > > line 300 > > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line300> > > > > I have difficulty convincing myself that this logic is correct -- is it > > always the case that the partition is neededByChooser in this case? It's > > probably correct, but it's very subtle logic that's easy to get wrong. > > Would it be possible to express this in a way that's easier to reason about? > > > > For example, mapping each systemStreamPartition to one of enum { > > IN_CHOOSER, NEEDED_BY_CHOOSER, SKIPPING_CHOOSER } would make clear that > > each partition is in exactly one of those three states at any one time. Moved logic into MessageChooserBuffer. Hopefully this makes things a little more clear. - Chris ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/20022/#review39588 ----------------------------------------------------------- On April 17, 2014, 5:30 p.m., Chris Riccomini wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/20022/ > ----------------------------------------------------------- > > (Updated April 17, 2014, 5:30 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > move coordinator to buffer, and add javadocs > > > move unprocessed messages into coordinator > > > adding a message chooser coordinator to try and extract some of the confusing > logic from system consumers into a separate class > > > don't hard code the refresh threshold > > > bump default max msgs per stream partition up to 10k > > > add an unprocessed message counter > > > black list empty system stream partitions > > > Diffs > ----- > > > samza-core/src/main/scala/org/apache/samza/system/MessageChooserBuffer.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala > bbbacb59866c2374853052f7cc11826552f5fb01 > > Diff: https://reviews.apache.org/r/20022/diff/ > > > Testing > ------- > > > Thanks, > > Chris Riccomini > >
