> On June 3, 2016, 7:18 p.m., Chris Pettitt wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala, > > line 123 > > <https://reviews.apache.org/r/48213/diff/1/?file=1405868#file1405868line123> > > > > Producer needs to be volatile or you need to hold the producerLock to > > get the producer. > > > > Also this new design can lead to a somewhat bad error - if the producer > > is shut down and then send is used you'll just get an NPE versus an error > > about the producer being stopped. I think the latter would make things > > easier from a troubleshooting perspective.
Producer is volatile now. Instead of using null, I added a flag indicating whether the producer has been created. So if the producer is closed, the other threads holding the producer will get the error of closed producer, instead of NPE. - Xinyu ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/48213/#review136097 ----------------------------------------------------------- On June 3, 2016, 10:09 p.m., Xinyu Liu wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/48213/ > ----------------------------------------------------------- > > (Updated June 3, 2016, 10:09 p.m.) > > > Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data > Infrastructure). > > > Repository: samza > > > Description > ------- > > All the system producers need to be thread safe in order to be used in > multithreaded tasks. The following are the changes > (ElasticSearchSystemProducer is already thread safe so no change made there): > > In KafkaSystemProducer, remove the buggy retry logic and treat any exception > as fatal. > In HdfsSystemProducer, add synchronization lock to all public methods. > > > Diffs > ----- > > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala > 1f4b5c46436e44b7c7cd1a49689c4f43f1f6ed1b > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala > 3769e103616dc0f1fd869706cc086e24cd926c48 > > samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java > 04c9113fd6c3dd56c49ff46c8c1c0ff12f68e5e2 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala > 8e32bba6ced090f0fc8d4e5176fe0788df36981d > > Diff: https://reviews.apache.org/r/48213/diff/ > > > Testing > ------- > > Unit tests and local testing. > > > Thanks, > > Xinyu Liu > >