> On March 18, 2015, 9:01 p.m., Navina Ramesh wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala, > > line 49 > > <https://reviews.apache.org/r/32155/diff/1/?file=897641#file897641line49> > > > > Can you explain the purpose of "failedSources"?
It is used to record failed sources. Because maybe there are more than one source failed, we want to know all of them and only stop the producer after flushing all the sources. Does that make sense? > On March 18, 2015, 9:01 p.m., Navina Ramesh wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala, > > line 148 > > <https://reviews.apache.org/r/32155/diff/1/?file=897641#file897641line148> > > > > Why can't you do the failedSources empty check here instead of Line 153 > > ? This is because, if there are more than one sources, such as source1, source2. If source1 fails and source2 succeed, and we flush each of them. We do not close the producer after flushing source1, instead, we close the producer after flushing source2. When flushing source2, the sendFailed.get() is not run. That's why the stop is outside of the conditions. - Yan ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32155/#review76944 ----------------------------------------------------------- On March 17, 2015, 9:37 a.m., Yan Fang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/32155/ > ----------------------------------------------------------- > > (Updated March 17, 2015, 9:37 a.m.) > > > Review request for samza. > > > Bugs: SAMZA-458 > https://issues.apache.org/jira/browse/SAMZA-458 > > > Repository: samza > > > Description > ------- > > add flush in close method > only throw exceptions after flushing all sources > add unit test > > > Diffs > ----- > > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala > 83668dd > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala > ca10ea5 > > Diff: https://reviews.apache.org/r/32155/diff/ > > > Testing > ------- > > ran unit tests and integration test > > > Thanks, > > Yan Fang > >