----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/52598/#review151794 -----------------------------------------------------------
Patch applies clearly on trunk currently (probably will conflicts after FLUME-2971). Code compiles with "mvn clean install -DskipTests", site builds without error message on the generated html. When running tests I got errors in the "mvn test -pl flume-ng-sinks/flume-ng-kafka-sink -fae" results: Failed tests: - testDefaultTopic(org.apache.flume.sink.kafka.TestKafkaSink): expected:<[default-topic-test]> but was:<[9]> - testEmptyChannel(org.apache.flume.sink.kafka.TestKafkaSink) "mvn test -pl flume-ng-sinks/flume-ng-kafka-sink -fae" passes on trunk, so this patch broke the tests. "mvn test -pl flume-ng-channels/flume-kafka-channel -fae" passed. flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java (line 439) <https://reviews.apache.org/r/52598/#comment220331> NumberFormatException is raised if header contains non number istead of defaulting to staticPartitionId and sending the event with that. I agree with this behaviour ie dropping whole event because of occasionally malformed header. Flume should either rely on the content of a processed event or not (including body and headers). What happens with that Event afterwards? Retry or backoff? flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java (lines 254 - 288) <https://reviews.apache.org/r/52598/#comment220335> Would you mind refactoring these? I would expect an extra test class for testing headers, common parts in @Before @After or utility methods and test fn body would define the header setup. Please don't use boolean flags+utility fn for code reuse. If the extra test class has been already created then the slightly better Enum+switch cases can also be refactored there as well to test fn bodies. flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java (lines 430 - 449) <https://reviews.apache.org/r/52598/#comment220337> Could you please explain this a bit more? I understand you would like to get a specific message/topic distribution. What I don't get is why intermixing cycle variable with a check for staticPtn==null helping in this task. flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java (lines 463 - 479) <https://reviews.apache.org/r/52598/#comment220339> this looks fine. it collects the results from each TopicPartition it could have been a utility fn flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java (lines 485 - 497) <https://reviews.apache.org/r/52598/#comment220338> Could you please a bit more explicit what these cases cover? eg introducing a new booleans with somwhat more descriptive names. flume-ng-doc/sphinx/FlumeUserGuide.rst (lines 2584 - 2587) <https://reviews.apache.org/r/52598/#comment220300> Please do not throw a TODo :) flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java (lines 217 - 222) <https://reviews.apache.org/r/52598/#comment220340> If no difference in the action and the last is base for the specialized ones then the specialized ones are not needed. ie only "} catch (Exception ex) {...}" is required flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java (line 55) <https://reviews.apache.org/r/52598/#comment220306> Is this utility used outside of this class? I haven't found any references. If not then I think visibility modifier shouldn't change. - Attila Simon On Oct. 6, 2016, 12:13 p.m., Tristan Stevens wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/52598/ > ----------------------------------------------------------- > > (Updated Oct. 6, 2016, 12:13 p.m.) > > > Review request for Flume and Grant Henke. > > > Repository: flume-git > > > Description > ------- > > This feature is useful for anyone who needs greater control of which > partitions are being written to - normally in a situation where multiple > Flume agents are being deployed in order to horizontally scale, or > alternatively if there is a scenario where there is a skew in data that might > lead to one or more partitions hotspotting. > We also have the ability to specify custom partitions on to the Kafka > Producer itself using the kafka.* configuration properties. > > The Kafka Producer provides the ability to set the partition ID using the > following constructor > (https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord%28java.lang.String,%20java.lang.Integer,%20K,%20V%29 > ), this is just a matter of providing the option to use this constructor. > > This is specified in one of two ways: either via the staticPartition > configuration property, which means that every message goes to the specified > partition, or via the partitionHeader configuration property, which directs > the implementation to retrieve the partitionId from one of the event headers. > > > Diffs > ----- > > > flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java > 66b553a > > flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java > 3ab807b > > flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java > 57c0b28 > flume-ng-doc/sphinx/FlumeUserGuide.rst ab71d38 > > flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java > 89bdd84 > > flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java > 1bf380c > > flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java > 76eca37 > > flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java > d5dfbd6 > > Diff: https://reviews.apache.org/r/52598/diff/ > > > Testing > ------- > > Unit testing done for both Kafka Channel and Kafka Sink. > > > Thanks, > > Tristan Stevens > >
