----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29899/#review68934 -----------------------------------------------------------
samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala <https://reviews.apache.org/r/29899/#comment113463> If there's an exception in .get(), do we need to throw it to retry? samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala <https://reviews.apache.org/r/29899/#comment113465> I don't think we need all of these. Anything that we're leaving the same as Kafka's default, we can safely not set. We only need to set non-default properties. For any defaults that we don't use, can you remove the constants above, just to shrink the copy/paste a bit. samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala <https://reviews.apache.org/r/29899/#comment113468> Same as clientId comment below. samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala <https://reviews.apache.org/r/29899/#comment113467> Don't need this if you declare `val clientId` in the constructor. samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala <https://reviews.apache.org/r/29899/#comment113479> Can sendFailed be eliminated if we just define exceptionThrown.get() == null as sendFailed=false, and exceptionThrown.get() != null as sendFailed=true? samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala <https://reviews.apache.org/r/29899/#comment113476> new AtomicReference() is safer. samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala <https://reviews.apache.org/r/29899/#comment113480> This call is cached, right? We're not going to trigger network traffic every time we call partitionsFor? samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala <https://reviews.apache.org/r/29899/#comment113481> This metric should be outside of the callback. The metric currently counts every time that a message was "sent" (enqueued in the buffer). Rather than changing the meaning of an existing metric, we should add new metrics where we need them. I think we might need to re-think these metrics: val reconnects = newCounter("producer-reconnects") val sends = newCounter("producer-sends") val flushes = newCounter("flushes") val flushSizes = newCounter("flush-sizes") val flushMs = newTimer("flush-ms") def setBufferSize(source: String, getValue: () => Int) { newGauge("%s-producer-buffer-size" format source, getValue) } samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala <https://reviews.apache.org/r/29899/#comment113482> Newline log messages are generally not good. Also, prefer hiding stack trace when retrying. See BrokerProxy example: warn("Restarting consumer due to %s. Turn on debugging to get a full stack trace." format exception) debug("Exception detail:", exception) samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala <https://reviews.apache.org/r/29899/#comment113483> Seems like this code appears twice, and we also have a stop() method. Can you combine everything into a single stop() method, and call it where appropriate? samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala <https://reviews.apache.org/r/29899/#comment113472> Is this used anywhere? samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala <https://reviews.apache.org/r/29899/#comment113471> Seems like we could leave this public. Could also go in samza-core's util, since it seems generally useful. samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java <https://reviews.apache.org/r/29899/#comment113470> Is this copy/pasted from Kafka as well? If so, should add a comment here, and point to the KAFKA-1861 JIRA. Should also include in the same follow-on SAMZA JIRA as the test utils class, below. samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java <https://reviews.apache.org/r/29899/#comment113469> Should open a tracker SAMZA ticket linked to KAFKA-1861 when SAMZA-227 is committed. - Chris Riccomini On Jan. 21, 2015, 3:11 a.m., Navina Ramesh wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/29899/ > ----------------------------------------------------------- > > (Updated Jan. 21, 2015, 3:11 a.m.) > > > Review request for samza, Chris Riccomini, Guozhang Wang, and Jay Kreps. > > > Bugs: SAMZA-227 > https://issues.apache.org/jira/browse/SAMZA-227 > > > Repository: samza > > > Description > ------- > > Modified logic in KafkaSystemProducer for send & flush based on the behavior > of the new java-based Kafka producer API > Added "MockKafkaProducer" in Samza layer to mock out the buffering behavior > provided by Kafka producer. The MockProducer exposed by Kafka does not > provide sufficient control for writing unit tests. > Producer config for Kafka is unified in "KafkaProducerConfig" with > appropriate default values. These can be overriden while instantiating the > producer config. > > > Diffs > ----- > > build.gradle 7a40ad4ae916610186848c06c4577e7067de98ee > gradle/dependency-versions.gradle 44dd42603e93788562fd64c68312570cee71a2aa > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala > 1d5627d0c561a0be6b48ee307b755958e62b783e > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala > f2defbd39708e959edb1d6674e542b5bc9e02666 > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > e57b8ba1e09765774314ec469645b5d0bbde060f > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > 4506ea367eec4e40da45feee777ba73069025a4c > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala > a0e1ccbfe9dc4fd26ca6b30fc2d1348fb7d007e4 > samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala > f1b7511775703775eaa5172d7da88d302a89aa2e > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala > 553d6b4d6ffe21f4a92c8c347e835d95d71b5863 > samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala > 0e1c38e5d68f2f3e42ecdb58297a11ff5d29374d > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java > PRE-CREATION > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > c759a7bea7b67714eaa90a97f828079f26acbca4 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala > 72b36f774b2b8845539f26fc592244353cf300cd > samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java > PRE-CREATION > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > ca25258217e5ebc44b34fbc4d69ecb28c81df618 > > Diff: https://reviews.apache.org/r/29899/diff/ > > > Testing > ------- > > ./gradlew clean build - SUCCESSFUL > > > Thanks, > > Navina Ramesh > >
