-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52598/#review151794
-----------------------------------------------------------



Patch applies clearly on trunk currently (probably will conflicts after 
FLUME-2971). Code compiles with "mvn clean install -DskipTests", site builds 
without error message on the generated html. When running tests I got errors in 
the "mvn test -pl flume-ng-sinks/flume-ng-kafka-sink -fae" results:

Failed tests:   
  - testDefaultTopic(org.apache.flume.sink.kafka.TestKafkaSink): 
expected:<[default-topic-test]> but was:<[9]>
  - testEmptyChannel(org.apache.flume.sink.kafka.TestKafkaSink)
"mvn test -pl flume-ng-sinks/flume-ng-kafka-sink -fae" passes on trunk, so this 
patch broke the tests. 

"mvn test -pl flume-ng-channels/flume-kafka-channel -fae" passed.


flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
 (line 439)
<https://reviews.apache.org/r/52598/#comment220331>

    NumberFormatException is raised if header contains non number istead of 
defaulting to staticPartitionId and sending the event with that. I agree with 
this behaviour ie dropping whole event because of occasionally malformed 
header. Flume should either rely on the content of a processed event or not 
(including body and headers). What happens with that Event afterwards? Retry or 
backoff?



flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
 (lines 254 - 288)
<https://reviews.apache.org/r/52598/#comment220335>

    Would you mind refactoring these? I would expect an extra test class for 
testing headers, common parts in @Before @After or utility methods and test fn 
body would define the header setup. Please don't use boolean flags+utility fn 
for code reuse. If the extra test class has been already created then the 
slightly better Enum+switch cases can also be refactored there as well to test 
fn bodies.



flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
 (lines 430 - 449)
<https://reviews.apache.org/r/52598/#comment220337>

    Could you please explain this a bit more? I understand you would like to 
get a specific message/topic distribution. What I don't get is why intermixing 
cycle variable with a check for staticPtn==null helping in this task.



flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
 (lines 463 - 479)
<https://reviews.apache.org/r/52598/#comment220339>

    this looks fine. it collects the results from each TopicPartition it could 
have been a utility fn



flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
 (lines 485 - 497)
<https://reviews.apache.org/r/52598/#comment220338>

    Could you please a bit more explicit what these cases cover? eg introducing 
a new booleans with somwhat more descriptive names.



flume-ng-doc/sphinx/FlumeUserGuide.rst (lines 2584 - 2587)
<https://reviews.apache.org/r/52598/#comment220300>

    Please do not throw a TODo :)



flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
 (lines 217 - 222)
<https://reviews.apache.org/r/52598/#comment220340>

    If no difference in the action and the last is base for the specialized 
ones then the specialized ones are not needed. ie only 
    
    "} catch (Exception ex) {...}" 
    
    is required



flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
 (line 55)
<https://reviews.apache.org/r/52598/#comment220306>

    Is this utility used outside of this class? I haven't found any references. 
If not then I think visibility modifier shouldn't change.


- Attila Simon


On Oct. 6, 2016, 12:13 p.m., Tristan Stevens wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52598/
> -----------------------------------------------------------
> 
> (Updated Oct. 6, 2016, 12:13 p.m.)
> 
> 
> Review request for Flume and Grant Henke.
> 
> 
> Repository: flume-git
> 
> 
> Description
> -------
> 
> This feature is useful for anyone who needs greater control of which 
> partitions are being written to - normally in a situation where multiple 
> Flume agents are being deployed in order to horizontally scale, or 
> alternatively if there is a scenario where there is a skew in data that might 
> lead to one or more partitions hotspotting.
> We also have the ability to specify custom partitions on to the Kafka 
> Producer itself using the kafka.* configuration properties.
> 
> The Kafka Producer provides the ability to set the partition ID using the 
> following constructor 
> (https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord%28java.lang.String,%20java.lang.Integer,%20K,%20V%29
>  ), this is just a matter of providing the option to use this constructor.
> 
> This is specified in one of two ways: either via the staticPartition 
> configuration property, which means that every message goes to the specified 
> partition, or via the partitionHeader configuration property, which directs 
> the implementation to retrieve the partitionId from one of the event headers.
> 
> 
> Diffs
> -----
> 
>   
> flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
>  66b553a 
>   
> flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
>  3ab807b 
>   
> flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
>  57c0b28 
>   flume-ng-doc/sphinx/FlumeUserGuide.rst ab71d38 
>   
> flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
>  89bdd84 
>   
> flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
>  1bf380c 
>   
> flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
>  76eca37 
>   
> flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
>  d5dfbd6 
> 
> Diff: https://reviews.apache.org/r/52598/diff/
> 
> 
> Testing
> -------
> 
> Unit testing done for both Kafka Channel and Kafka Sink.
> 
> 
> Thanks,
> 
> Tristan Stevens
> 
>

Reply via email to