Repository: nifi
Updated Branches:
  refs/heads/master 99b4af782 -> 0dbba811f


NIFI-2298 This closes #687 added Kafka consume attributes to a FlowFile


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0dbba811
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0dbba811
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0dbba811

Branch: refs/heads/master
Commit: 0dbba811f364768224a8a2e4a18f07d40568d133
Parents: 99b4af7
Author: Oleg Zhurakousky <[email protected]>
Authored: Wed Jul 20 12:18:55 2016 -0400
Committer: joewitt <[email protected]>
Committed: Thu Jul 21 22:56:24 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/kafka/pubsub/ConsumeKafka.java     | 13 +++++++++++++
 .../nifi/processors/kafka/pubsub/ConsumeKafkaTest.java |  3 +++
 2 files changed, 16 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0dbba811/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index 5949bf0..2ed2db9 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -21,8 +21,10 @@ import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -161,6 +163,7 @@ public class ConsumeKafka extends 
AbstractKafkaProcessor<Consumer<byte[], byte[]
             long start = System.nanoTime();
             FlowFile flowFile = processSession.create();
             final AtomicInteger messageCounter = new AtomicInteger();
+            final Map<String, String> kafkaAttributes = new HashMap<>();
 
             final Iterator<ConsumerRecord<byte[], byte[]>> iter = 
consumedRecords.iterator();
             while (iter.hasNext()){
@@ -168,12 +171,22 @@ public class ConsumeKafka extends 
AbstractKafkaProcessor<Consumer<byte[], byte[]
                     @Override
                     public void process(final OutputStream out) throws 
IOException {
                         ConsumerRecord<byte[], byte[]> consumedRecord = 
iter.next();
+
+                        kafkaAttributes.put("kafka.offset", 
String.valueOf(consumedRecord.offset()));
+                        if (consumedRecord.key() != null) {
+                            kafkaAttributes.put("kafka.key", new 
String(consumedRecord.key(), StandardCharsets.UTF_8));
+                        }
+                        kafkaAttributes.put("kafka.partition", 
String.valueOf(consumedRecord.partition()));
+                        kafkaAttributes.put("kafka.topic", 
consumedRecord.topic());
+
                         if (messageCounter.getAndIncrement() > 0 && 
ConsumeKafka.this.demarcatorBytes != null) {
                             out.write(ConsumeKafka.this.demarcatorBytes);
                         }
                         out.write(consumedRecord.value());
                     }
                 });
+
+                flowFile = processSession.putAllAttributes(flowFile, 
kafkaAttributes);
                 /*
                  * Release FlowFile if there are more messages in the
                  * ConsumerRecords batch and no demarcator was provided,

http://git-wip-us.apache.org/repos/asf/nifi/blob/0dbba811/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 2031e76..374a91b 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -150,6 +150,9 @@ public class ConsumeKafkaTest {
         assertEquals(2, flowFiles.size());
         MockFlowFile flowFile = flowFiles.get(0);
         String[] events = new String(flowFile.toByteArray(), 
StandardCharsets.UTF_8).split("blah");
+        assertEquals("0", flowFile.getAttribute("kafka.partition"));
+        assertEquals("0", flowFile.getAttribute("kafka.offset"));
+        assertEquals("validateGetAllMessagesWithProvidedDemarcator", 
flowFile.getAttribute("kafka.topic"));
 
         assertEquals(2, events.length);
 

Reply via email to