Repository: nifi Updated Branches: refs/heads/master 582df7f4e -> 7ad752015
NIFI-4437: This closes #2183. When using ConsumeKafka_0_11 and no message demarcator, ensure that we add FlowFile Attributes for any Message Header that matches the 'Headers to Add as Attributes (Regex)' property Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7ad75201 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7ad75201 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7ad75201 Branch: refs/heads/master Commit: 7ad752015037794b30827dcf728afabed925b778 Parents: 582df7f Author: Mark Payne <[email protected]> Authored: Thu Sep 28 15:34:22 2017 -0400 Committer: joewitt <[email protected]> Committed: Fri Oct 6 15:06:32 2017 -0400 ---------------------------------------------------------------------- .../java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 1 + 1 file changed, 1 insertion(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7ad75201/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 0587788..eed797e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -402,6 +402,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe out.write(value); }); } + flowFile = session.putAllAttributes(flowFile, getAttributes(record)); tracker.updateFlowFile(flowFile); populateAttributes(tracker); session.transfer(tracker.flowFile, REL_SUCCESS);
