> On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala,
> >  line 125
> > <https://reviews.apache.org/r/29899/diff/3/?file=827601#file827601line125>
> >
> >     If there's an exception in .get(), do we need to throw it to retry?
> 
> Navina Ramesh wrote:
>     My understanding is that .get() is a blocking call. If there is an 
> exception in the callback, isn't it propagted up to the get() call and caught 
> in the retry loop?

Doh, you're right.


> On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala,
> >  line 49
> > <https://reviews.apache.org/r/29899/diff/3/?file=827605#file827605line49>
> >
> >     Can sendFailed be eliminated if we just define exceptionThrown.get() == 
> > null as sendFailed=false, and exceptionThrown.get() != null as 
> > sendFailed=true?
> 
> Navina Ramesh wrote:
>     It's true that we can eliminate sendFailed with exceptionThrown. I felt 
> that this was an overloaded use of a shared variable. My idea was to use 
> "sendFailed" to determine whether to retry the loop or not and 
> "exceptionThrown" is just a shared buffer between the threads to hold the 
> exception message. It does not control the retry behavior of the loop. 
>     Let me know if you think it is similar to go with the approach you 
> suggested, though I haven't verified all corner cases for that.

Yea, it's a trade-off here. Complexity of understanding that a single var is 
used in two different ways, vs. managing two independent variables that are 
tightly coupled. I'm not too particular about it. If you want to leave as is, 
that's fine.


> On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala,
> >  line 87
> > <https://reviews.apache.org/r/29899/diff/3/?file=827605#file827605line87>
> >
> >     This call is cached, right? We're not going to trigger network traffic 
> > every time we call partitionsFor?
> 
> Navina Ramesh wrote:
>     We are not caching it. I believe the Kafka layer, by default handles 
> caching the metadata and periodically refreshing it.

Yup, looks like it's cached from KafkaProducer -> Metadata -> Cluster.


> On Jan. 21, 2015, 5:50 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala,
> >  line 112
> > <https://reviews.apache.org/r/29899/diff/3/?file=827605#file827605line112>
> >
> >     This metric should be outside of the callback. The metric currently 
> > counts every time that a message was "sent" (enqueued in the buffer). 
> > Rather than changing the meaning of an existing metric, we should add new 
> > metrics where we need them.
> >     
> >     I think we might need to re-think these metrics:
> >     
> >         val reconnects = newCounter("producer-reconnects")
> >         val sends = newCounter("producer-sends")
> >         val flushes = newCounter("flushes")
> >         val flushSizes = newCounter("flush-sizes")
> >         val flushMs = newTimer("flush-ms")
> >     
> >         def setBufferSize(source: String, getValue: () => Int) {
> >           newGauge("%s-producer-buffer-size" format source, getValue)
> >         }
> 
> Navina Ramesh wrote:
>     Yeah. I kept metrics as an after-thought. Thanks for brining it up. 
>     
>     producer-reconnects, flushSizes -> don't think it is useful anymore
>     flushMs : same as before
>     flushes: same as before
>     
>     producer-sends: If it means enqueued in buffer, then we can move it 
> outside the callback. 
>     If we want to really know if the messages were sent by the Kafka 
> producer, we should do it in the callback. I don't think Kafka provides any 
> feedback other than future for each of the sends.
>     
>     Additionally, I think we can add the following:
>     retries: whenever a retriable exception is thrown, we can increment the 
> counter. (after the warn statement)
>     flush-failed: whenever a flush fails
>     send-failed: when a send failed with an exception.

> producer-sends: If it means enqueued in buffer, then we can move it outside 
> the callback.

Yea, I think we should do that. I think we should also add a new metric for 
inside the callbackm, when a send succeeds. This way the existing metric will 
remain backward-compatible, but we'll have a new metric to measure successful 
sends.

The rest of your list looks good to me.


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29899/#review68934
-----------------------------------------------------------


On Jan. 21, 2015, 3:11 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29899/
> -----------------------------------------------------------
> 
> (Updated Jan. 21, 2015, 3:11 a.m.)
> 
> 
> Review request for samza, Chris Riccomini, Guozhang Wang, and Jay Kreps.
> 
> 
> Bugs: SAMZA-227
>     https://issues.apache.org/jira/browse/SAMZA-227
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Modified logic in KafkaSystemProducer for send & flush based on the behavior 
> of the new java-based Kafka producer API
> Added "MockKafkaProducer" in Samza layer to mock out the buffering behavior 
> provided by Kafka producer. The MockProducer exposed by Kafka does not 
> provide sufficient control for writing unit tests.
> Producer config for Kafka is unified in "KafkaProducerConfig" with 
> appropriate default values. These can be overriden while instantiating the 
> producer config.
> 
> 
> Diffs
> -----
> 
>   build.gradle 7a40ad4ae916610186848c06c4577e7067de98ee 
>   gradle/dependency-versions.gradle 44dd42603e93788562fd64c68312570cee71a2aa 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  1d5627d0c561a0be6b48ee307b755958e62b783e 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  f2defbd39708e959edb1d6674e542b5bc9e02666 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> e57b8ba1e09765774314ec469645b5d0bbde060f 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  4506ea367eec4e40da45feee777ba73069025a4c 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  a0e1ccbfe9dc4fd26ca6b30fc2d1348fb7d007e4 
>   samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala 
> f1b7511775703775eaa5172d7da88d302a89aa2e 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  553d6b4d6ffe21f4a92c8c347e835d95d71b5863 
>   samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala 
> 0e1c38e5d68f2f3e42ecdb58297a11ff5d29374d 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
>  PRE-CREATION 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  c759a7bea7b67714eaa90a97f828079f26acbca4 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
>  72b36f774b2b8845539f26fc592244353cf300cd 
>   samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java 
> PRE-CREATION 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  ca25258217e5ebc44b34fbc4d69ecb28c81df618 
> 
> Diff: https://reviews.apache.org/r/29899/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build - SUCCESSFUL
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>

Reply via email to