-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29899/#review68934
-----------------------------------------------------------



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/29899/#comment113463>

    If there's an exception in .get(), do we need to throw it to retry?



samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
<https://reviews.apache.org/r/29899/#comment113465>

    I don't think we need all of these. Anything that we're leaving the same as 
Kafka's default, we can safely not set. We only need to set non-default 
properties.
    
    For any defaults that we don't use, can you remove the constants above, 
just to shrink the copy/paste a bit.



samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
<https://reviews.apache.org/r/29899/#comment113468>

    Same as clientId comment below.



samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
<https://reviews.apache.org/r/29899/#comment113467>

    Don't need this if you declare `val clientId` in the constructor.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
<https://reviews.apache.org/r/29899/#comment113479>

    Can sendFailed be eliminated if we just define exceptionThrown.get() == 
null as sendFailed=false, and exceptionThrown.get() != null as sendFailed=true?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
<https://reviews.apache.org/r/29899/#comment113476>

    new AtomicReference() is safer.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
<https://reviews.apache.org/r/29899/#comment113480>

    This call is cached, right? We're not going to trigger network traffic 
every time we call partitionsFor?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
<https://reviews.apache.org/r/29899/#comment113481>

    This metric should be outside of the callback. The metric currently counts 
every time that a message was "sent" (enqueued in the buffer). Rather than 
changing the meaning of an existing metric, we should add new metrics where we 
need them.
    
    I think we might need to re-think these metrics:
    
        val reconnects = newCounter("producer-reconnects")
        val sends = newCounter("producer-sends")
        val flushes = newCounter("flushes")
        val flushSizes = newCounter("flush-sizes")
        val flushMs = newTimer("flush-ms")
    
        def setBufferSize(source: String, getValue: () => Int) {
          newGauge("%s-producer-buffer-size" format source, getValue)
        }



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
<https://reviews.apache.org/r/29899/#comment113482>

    Newline log messages are generally not good. Also, prefer hiding stack 
trace when retrying. See BrokerProxy example:
    
        warn("Restarting consumer due to %s. Turn on debugging to get a full 
stack trace." format exception)
        debug("Exception detail:", exception)



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
<https://reviews.apache.org/r/29899/#comment113483>

    Seems like this code appears twice, and we also have a stop() method. Can 
you combine everything into a single stop() method, and call it where 
appropriate?



samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
<https://reviews.apache.org/r/29899/#comment113472>

    Is this used anywhere?



samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
<https://reviews.apache.org/r/29899/#comment113471>

    Seems like we could leave this public. Could also go in samza-core's util, 
since it seems generally useful.



samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
<https://reviews.apache.org/r/29899/#comment113470>

    Is this copy/pasted from Kafka as well? If so, should add a comment here, 
and point to the KAFKA-1861 JIRA. Should also include in the same follow-on 
SAMZA JIRA as the test utils class, below.



samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java
<https://reviews.apache.org/r/29899/#comment113469>

    Should open a tracker SAMZA ticket linked to KAFKA-1861 when SAMZA-227 is 
committed.


- Chris Riccomini


On Jan. 21, 2015, 3:11 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29899/
> -----------------------------------------------------------
> 
> (Updated Jan. 21, 2015, 3:11 a.m.)
> 
> 
> Review request for samza, Chris Riccomini, Guozhang Wang, and Jay Kreps.
> 
> 
> Bugs: SAMZA-227
>     https://issues.apache.org/jira/browse/SAMZA-227
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Modified logic in KafkaSystemProducer for send & flush based on the behavior 
> of the new java-based Kafka producer API
> Added "MockKafkaProducer" in Samza layer to mock out the buffering behavior 
> provided by Kafka producer. The MockProducer exposed by Kafka does not 
> provide sufficient control for writing unit tests.
> Producer config for Kafka is unified in "KafkaProducerConfig" with 
> appropriate default values. These can be overriden while instantiating the 
> producer config.
> 
> 
> Diffs
> -----
> 
>   build.gradle 7a40ad4ae916610186848c06c4577e7067de98ee 
>   gradle/dependency-versions.gradle 44dd42603e93788562fd64c68312570cee71a2aa 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  1d5627d0c561a0be6b48ee307b755958e62b783e 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  f2defbd39708e959edb1d6674e542b5bc9e02666 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> e57b8ba1e09765774314ec469645b5d0bbde060f 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  4506ea367eec4e40da45feee777ba73069025a4c 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  a0e1ccbfe9dc4fd26ca6b30fc2d1348fb7d007e4 
>   samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala 
> f1b7511775703775eaa5172d7da88d302a89aa2e 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  553d6b4d6ffe21f4a92c8c347e835d95d71b5863 
>   samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala 
> 0e1c38e5d68f2f3e42ecdb58297a11ff5d29374d 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
>  PRE-CREATION 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  c759a7bea7b67714eaa90a97f828079f26acbca4 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
>  72b36f774b2b8845539f26fc592244353cf300cd 
>   samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java 
> PRE-CREATION 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  ca25258217e5ebc44b34fbc4d69ecb28c81df618 
> 
> Diff: https://reviews.apache.org/r/29899/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build - SUCCESSFUL
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>

Reply via email to