Repository: flume Updated Branches: refs/heads/flume-1.7 ad9d4555a -> f2bdd57fc
FLUME-2915: The kafka channel using new APIs will be stuck when the sink is avro sink (Jeff Holoman via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f2bdd57f Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f2bdd57f Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f2bdd57f Branch: refs/heads/flume-1.7 Commit: f2bdd57fc4f1eaa411c08e71e67eed766af7b731 Parents: ad9d455 Author: Jarek Jarcec Cecho <[email protected]> Authored: Fri Jun 10 15:34:24 2016 +0200 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Fri Jun 10 15:35:14 2016 +0200 ---------------------------------------------------------------------- .../flume/channel/kafka/KafkaChannel.java | 4 ++- .../flume/channel/kafka/TestKafkaChannel.java | 29 ++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/f2bdd57f/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 2d9b0c6..09d3f9d 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -381,7 +381,9 @@ public class KafkaChannel extends BasicChannelSemantics { } //Add the key to the header - e.getHeaders().put(KEY_HEADER, record.key()); + if (record.key() != null) { + e.getHeaders().put(KEY_HEADER, record.key()); + } if (logger.isDebugEnabled()) { logger.debug("Processed output from partition {} offset {}", record.partition(), record.offset()); http://git-wip-us.apache.org/repos/asf/flume/blob/f2bdd57f/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java index 637428d..13e073b 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java @@ -186,6 +186,11 @@ public class TestKafkaChannel { doParseAsFlumeEventFalseAsSource(true); } + @Test + public void testNullKeyNoHeader() throws Exception { + doTestNullKeyNoHeader(); + } + private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception { final KafkaChannel channel = startChannel(false); Properties props = channel.getProducerProps(); @@ -215,6 +220,30 @@ public class TestKafkaChannel { channel.stop(); } + private void doTestNullKeyNoHeader() throws Exception { + final KafkaChannel channel = startChannel(false); + Properties props = channel.getProducerProps(); + KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props); + + for (int i = 0; i < 50; i++) { + ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, null, String.valueOf(i).getBytes()); + producer.send(data).get(); + } + ExecutorCompletionService<Void> submitterSvc = new + ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); + List<Event> events = pullEvents(channel, submitterSvc, + 50, false, false); + wait(submitterSvc, 5); + List<String> finals = new ArrayList<String>(50); + for (int i = 0; i < 50; i++) { + finals.add(i, events.get(i).getHeaders().get(KEY_HEADER)); + } + for (int i = 0; i < 50; i++) { + Assert.assertTrue( finals.get(i) == null); + } + channel.stop(); + } + /** * Like the previous test but here we write to the channel like a Flume source would do * to verify that the events are written as text and not as an Avro object
