Repository: nifi Updated Branches: refs/heads/master 99b4af782 -> 0dbba811f
NIFI-2298 This closes #687 added Kafka consume attributes to a FlowFile Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0dbba811 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0dbba811 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0dbba811 Branch: refs/heads/master Commit: 0dbba811f364768224a8a2e4a18f07d40568d133 Parents: 99b4af7 Author: Oleg Zhurakousky <[email protected]> Authored: Wed Jul 20 12:18:55 2016 -0400 Committer: joewitt <[email protected]> Committed: Thu Jul 21 22:56:24 2016 -0400 ---------------------------------------------------------------------- .../nifi/processors/kafka/pubsub/ConsumeKafka.java | 13 +++++++++++++ .../nifi/processors/kafka/pubsub/ConsumeKafkaTest.java | 3 +++ 2 files changed, 16 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0dbba811/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java index 5949bf0..2ed2db9 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java @@ -21,8 +21,10 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -161,6 +163,7 @@ public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[] long start = System.nanoTime(); FlowFile flowFile = processSession.create(); final AtomicInteger messageCounter = new AtomicInteger(); + final Map<String, String> kafkaAttributes = new HashMap<>(); final Iterator<ConsumerRecord<byte[], byte[]>> iter = consumedRecords.iterator(); while (iter.hasNext()){ @@ -168,12 +171,22 @@ public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[] @Override public void process(final OutputStream out) throws IOException { ConsumerRecord<byte[], byte[]> consumedRecord = iter.next(); + + kafkaAttributes.put("kafka.offset", String.valueOf(consumedRecord.offset())); + if (consumedRecord.key() != null) { + kafkaAttributes.put("kafka.key", new String(consumedRecord.key(), StandardCharsets.UTF_8)); + } + kafkaAttributes.put("kafka.partition", String.valueOf(consumedRecord.partition())); + kafkaAttributes.put("kafka.topic", consumedRecord.topic()); + if (messageCounter.getAndIncrement() > 0 && ConsumeKafka.this.demarcatorBytes != null) { out.write(ConsumeKafka.this.demarcatorBytes); } out.write(consumedRecord.value()); } }); + + flowFile = processSession.putAllAttributes(flowFile, kafkaAttributes); /* * Release FlowFile if there are more messages in the * ConsumerRecords batch and no demarcator was provided, http://git-wip-us.apache.org/repos/asf/nifi/blob/0dbba811/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index 2031e76..374a91b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -150,6 +150,9 @@ public class ConsumeKafkaTest { assertEquals(2, flowFiles.size()); MockFlowFile flowFile = flowFiles.get(0); String[] events = new String(flowFile.toByteArray(), StandardCharsets.UTF_8).split("blah"); + assertEquals("0", flowFile.getAttribute("kafka.partition")); + assertEquals("0", flowFile.getAttribute("kafka.offset")); + assertEquals("validateGetAllMessagesWithProvidedDemarcator", flowFile.getAttribute("kafka.topic")); assertEquals(2, events.length);
