> On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala, > > line 125 > > <https://reviews.apache.org/r/29899/diff/3/?file=827601#file827601line125> > > > > If there's an exception in .get(), do we need to throw it to retry?
My understanding is that .get() is a blocking call. If there is an exception in the callback, isn't it propagted up to the get() call and caught in the retry loop? > On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala, line > > 191 > > <https://reviews.apache.org/r/29899/diff/3/?file=827603#file827603line191> > > > > I don't think we need all of these. Anything that we're leaving the > > same as Kafka's default, we can safely not set. We only need to set > > non-default properties. > > > > For any defaults that we don't use, can you remove the constants above, > > just to shrink the copy/paste a bit. I think the idea was to keep all the defaults in one config file and make it accessible across the modules. Since Kafka doesn't expose the default values, I chose to add them here. But after all the producer related changes, I don't think we need ALL the producer defaults to be accessible. We only use the reconnectInterval value and we can return a default for it. Will change this. More details in KAFKA-1794 > On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala, line > > 232 > > <https://reviews.apache.org/r/29899/diff/3/?file=827603#file827603line232> > > > > Same as clientId comment below. Sure > On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala, line > > 233 > > <https://reviews.apache.org/r/29899/diff/3/?file=827603#file827603line233> > > > > Don't need this if you declare `val clientId` in the constructor. Yes. agreed > On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala, > > line 49 > > <https://reviews.apache.org/r/29899/diff/3/?file=827605#file827605line49> > > > > Can sendFailed be eliminated if we just define exceptionThrown.get() == > > null as sendFailed=false, and exceptionThrown.get() != null as > > sendFailed=true? It's true that we can eliminate sendFailed with exceptionThrown. I felt that this was an overloaded use of a shared variable. My idea was to use "sendFailed" to determine whether to retry the loop or not and "exceptionThrown" is just a shared buffer between the threads to hold the exception message. It does not control the retry behavior of the loop. Let me know if you think it is similar to go with the approach you suggested, though I haven't verified all corner cases for that. > On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala, > > line 50 > > <https://reviews.apache.org/r/29899/diff/3/?file=827605#file827605line50> > > > > new AtomicReference() is safer. Ok > On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala, > > line 87 > > <https://reviews.apache.org/r/29899/diff/3/?file=827605#file827605line87> > > > > This call is cached, right? We're not going to trigger network traffic > > every time we call partitionsFor? We are not caching it. I believe the Kafka layer, by default handles caching the metadata and periodically refreshing it. > On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala, > > line 112 > > <https://reviews.apache.org/r/29899/diff/3/?file=827605#file827605line112> > > > > This metric should be outside of the callback. The metric currently > > counts every time that a message was "sent" (enqueued in the buffer). > > Rather than changing the meaning of an existing metric, we should add new > > metrics where we need them. > > > > I think we might need to re-think these metrics: > > > > val reconnects = newCounter("producer-reconnects") > > val sends = newCounter("producer-sends") > > val flushes = newCounter("flushes") > > val flushSizes = newCounter("flush-sizes") > > val flushMs = newTimer("flush-ms") > > > > def setBufferSize(source: String, getValue: () => Int) { > > newGauge("%s-producer-buffer-size" format source, getValue) > > } Yeah. I kept metrics as an after-thought. Thanks for brining it up. producer-reconnects, flushSizes -> don't think it is useful anymore flushMs : same as before flushes: same as before producer-sends: If it means enqueued in buffer, then we can move it outside the callback. If we want to really know if the messages were sent by the Kafka producer, we should do it in the callback. I don't think Kafka provides any feedback other than future for each of the sends. Additionally, I think we can add the following: retries: whenever a retriable exception is thrown, we can increment the counter. (after the warn statement) flush-failed: whenever a flush fails send-failed: when a send failed with an exception. > On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala, > > line 143 > > <https://reviews.apache.org/r/29899/diff/3/?file=827605#file827605line143> > > > > Newline log messages are generally not good. Also, prefer hiding stack > > trace when retrying. See BrokerProxy example: > > > > warn("Restarting consumer due to %s. Turn on debugging to get a > > full stack trace." format exception) > > debug("Exception detail:", exception) Ok > On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala, > > line 160 > > <https://reviews.apache.org/r/29899/diff/3/?file=827605#file827605line160> > > > > Seems like this code appears twice, and we also have a stop() method. > > Can you combine everything into a single stop() method, and call it where > > appropriate? yeah.. makes sense > On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala, line 32 > > <https://reviews.apache.org/r/29899/diff/3/?file=827606#file827606line32> > > > > Is this used anywhere? Nah. That needs to go away. > On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote: > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java, > > line 26 > > <https://reviews.apache.org/r/29899/diff/3/?file=827609#file827609line26> > > > > Is this copy/pasted from Kafka as well? If so, should add a comment > > here, and point to the KAFKA-1861 JIRA. Should also include in the same > > follow-on SAMZA JIRA as the test utils class, below. No. This is our own :) The MockProducer in Kafka API was not fulfiling our needs. > On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote: > > samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java, line 33 > > <https://reviews.apache.org/r/29899/diff/3/?file=827612#file827612line33> > > > > Should open a tracker SAMZA ticket linked to KAFKA-1861 when SAMZA-227 > > is committed. Will do. - Navina ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29899/#review68934 ----------------------------------------------------------- On Jan. 21, 2015, 3:11 a.m., Navina Ramesh wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/29899/ > ----------------------------------------------------------- > > (Updated Jan. 21, 2015, 3:11 a.m.) > > > Review request for samza, Chris Riccomini, Guozhang Wang, and Jay Kreps. > > > Bugs: SAMZA-227 > https://issues.apache.org/jira/browse/SAMZA-227 > > > Repository: samza > > > Description > ------- > > Modified logic in KafkaSystemProducer for send & flush based on the behavior > of the new java-based Kafka producer API > Added "MockKafkaProducer" in Samza layer to mock out the buffering behavior > provided by Kafka producer. The MockProducer exposed by Kafka does not > provide sufficient control for writing unit tests. > Producer config for Kafka is unified in "KafkaProducerConfig" with > appropriate default values. These can be overriden while instantiating the > producer config. > > > Diffs > ----- > > build.gradle 7a40ad4ae916610186848c06c4577e7067de98ee > gradle/dependency-versions.gradle 44dd42603e93788562fd64c68312570cee71a2aa > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala > 1d5627d0c561a0be6b48ee307b755958e62b783e > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala > f2defbd39708e959edb1d6674e542b5bc9e02666 > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > e57b8ba1e09765774314ec469645b5d0bbde060f > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > 4506ea367eec4e40da45feee777ba73069025a4c > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala > a0e1ccbfe9dc4fd26ca6b30fc2d1348fb7d007e4 > samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala > f1b7511775703775eaa5172d7da88d302a89aa2e > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala > 553d6b4d6ffe21f4a92c8c347e835d95d71b5863 > samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala > 0e1c38e5d68f2f3e42ecdb58297a11ff5d29374d > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java > PRE-CREATION > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > c759a7bea7b67714eaa90a97f828079f26acbca4 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala > 72b36f774b2b8845539f26fc592244353cf300cd > samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java > PRE-CREATION > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > ca25258217e5ebc44b34fbc4d69ecb28c81df618 > > Diff: https://reviews.apache.org/r/29899/diff/ > > > Testing > ------- > > ./gradlew clean build - SUCCESSFUL > > > Thanks, > > Navina Ramesh > >
