This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 96ba9cc710 NIFI-15550 - Add optional header prefix property in 
ConsumeKafka
96ba9cc710 is described below

commit 96ba9cc710f2ca81a8f52e99d36a4bf75144bd62
Author: Pierre Villard <[email protected]>
AuthorDate: Wed Feb 4 18:26:19 2026 +0100

    NIFI-15550 - Add optional header prefix property in ConsumeKafka
    
    This closes #10853.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../nifi/kafka/processors/ConsumeKafkaIT.java      |  48 ++++++
 .../apache/nifi/kafka/processors/ConsumeKafka.java |  21 ++-
 .../nifi/kafka/processors/common/KafkaUtils.java   |   9 +-
 .../FlowFileStreamKafkaMessageConverter.java       |   5 +-
 .../kafka/processors/common/KafkaUtilsTest.java    | 169 +++++++++++++++++++++
 5 files changed, 249 insertions(+), 3 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
index 03523a8cf3..e4e12898e3 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
@@ -116,6 +116,54 @@ class ConsumeKafkaIT extends AbstractConsumeKafkaIT {
         flowFile.assertAttributeNotExists("ccc");
     }
 
+    @Test
+    void testProcessingStrategyFlowFileHeaderNamePrefix() throws 
InterruptedException, ExecutionException {
+        final String topic = UUID.randomUUID().toString();
+        final String groupId = topic.substring(0, topic.indexOf("-"));
+
+        runner.setProperty(ConsumeKafka.GROUP_ID, groupId);
+        runner.setProperty(ConsumeKafka.TOPICS, topic);
+        runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, 
ProcessingStrategy.FLOW_FILE.getValue());
+        runner.setProperty(ConsumeKafka.HEADER_NAME_PATTERN, ".*");
+        runner.setProperty(ConsumeKafka.HEADER_NAME_PREFIX, "kafka.header.");
+
+        runner.run(1, false, true);
+
+        final List<Header> headers = Arrays.asList(
+                new RecordHeader("uuid", 
"test-uuid-value".getBytes(StandardCharsets.UTF_8)),
+                new RecordHeader("filename", 
"test-filename".getBytes(StandardCharsets.UTF_8)),
+                new RecordHeader("custom", 
"custom-value".getBytes(StandardCharsets.UTF_8)));
+        produceOne(topic, 0, null, RECORD_VALUE, headers);
+        while (runner.getFlowFilesForRelationship("success").isEmpty()) {
+            runner.run(1, false, false);
+        }
+
+        runner.run(1, true, false);
+
+        final Iterator<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS).iterator();
+        assertTrue(flowFiles.hasNext());
+
+        final MockFlowFile flowFile = flowFiles.next();
+        flowFile.assertContentEquals(RECORD_VALUE);
+        flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_TOPIC, 
topic);
+        flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_PARTITION, 
Integer.toString(FIRST_PARTITION));
+        flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_OFFSET, 
Long.toString(FIRST_OFFSET));
+        flowFile.assertAttributeExists(KafkaFlowFileAttribute.KAFKA_TIMESTAMP);
+        
flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_HEADER_COUNT, "3");
+
+        // Verify headers are prefixed with "kafka.header."
+        flowFile.assertAttributeEquals("kafka.header.uuid", "test-uuid-value");
+        flowFile.assertAttributeEquals("kafka.header.filename", 
"test-filename");
+        flowFile.assertAttributeEquals("kafka.header.custom", "custom-value");
+
+        // Verify the Kafka header values did NOT overwrite the core FlowFile 
attributes
+        // Note: "uuid" and "filename" are core FlowFile attributes that 
always exist,
+        // but they should NOT contain the Kafka header values when prefix is 
used
+        flowFile.assertAttributeExists("uuid"); // Core attribute exists
+        flowFile.assertAttributeNotEquals("uuid", "test-uuid-value"); // But 
not with Kafka header value
+        flowFile.assertAttributeNotExists("custom"); // Non-core attributes 
should not exist without prefix
+    }
+
     /**
      * Test ability to specify a topic regular expression to query for 
messages.
      */
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index 455c5ee5dc..336f6e77d9 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -210,6 +210,18 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
             .expressionLanguageSupported(NONE)
             .build();
 
+    static final PropertyDescriptor HEADER_NAME_PREFIX = new 
PropertyDescriptor.Builder()
+            .name("Header Name Prefix")
+            .description("""
+                    A prefix to apply to the FlowFile attribute name when 
writing Kafka Record Header values.
+                    This is useful to avoid conflicts with reserved FlowFile 
attribute names such as 'uuid'.
+                    For example, if set to 'kafka.header.', a Kafka header 
named 'uuid' would be written as 'kafka.header.uuid'.
+                    """)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(false)
+            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE)
+            .build();
+
     static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
             .name("Record Reader")
             .description("The Record Reader to use for incoming Kafka 
messages")
@@ -304,6 +316,7 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
             HEADER_NAME_PATTERN,
             HEADER_ENCODING,
             PROCESSING_STRATEGY,
+            HEADER_NAME_PREFIX,
             RECORD_READER,
             RECORD_WRITER,
             OUTPUT_STRATEGY,
@@ -319,6 +332,7 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
 
     private volatile Charset headerEncoding;
     private volatile Pattern headerNamePattern;
+    private volatile String headerNamePrefix;
     private volatile ProcessingStrategy processingStrategy;
     private volatile KeyEncoding keyEncoding;
     private volatile OutputStrategy outputStrategy;
@@ -366,6 +380,11 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
         keyEncoding = 
context.getProperty(KEY_ATTRIBUTE_ENCODING).asAllowableValue(KeyEncoding.class);
         commitOffsets = context.getProperty(COMMIT_OFFSETS).asBoolean();
         processingStrategy = 
context.getProperty(PROCESSING_STRATEGY).asAllowableValue(ProcessingStrategy.class);
+
+        // Only read HEADER_NAME_PREFIX when PROCESSING_STRATEGY is FLOW_FILE 
(property dependency)
+        headerNamePrefix = processingStrategy == ProcessingStrategy.FLOW_FILE
+                ? context.getProperty(HEADER_NAME_PREFIX).getValue()
+                : null;
         outputStrategy = processingStrategy == ProcessingStrategy.RECORD ? 
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class) : 
null;
         keyFormat = (outputStrategy == OutputStrategy.USE_WRAPPER || 
outputStrategy == OutputStrategy.INJECT_METADATA)
                 ? 
context.getProperty(KEY_FORMAT).asAllowableValue(KeyFormat.class)
@@ -612,7 +631,7 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
 
     private void processInputFlowFile(final ProcessSession session, final 
OffsetTracker offsetTracker, final Iterator<ByteRecord> consumerRecords) {
         final KafkaMessageConverter converter = new 
FlowFileStreamKafkaMessageConverter(
-            headerEncoding, headerNamePattern, keyEncoding, commitOffsets, 
offsetTracker, brokerUri);
+            headerEncoding, headerNamePattern, headerNamePrefix, keyEncoding, 
commitOffsets, offsetTracker, brokerUri);
         converter.toFlowFiles(session, consumerRecords);
     }
 
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java
index 0dbfd158b1..40e1e37367 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java
@@ -74,6 +74,12 @@ public class KafkaUtils {
     public static Map<String, String> toAttributes(final ByteRecord 
consumerRecord, final KeyEncoding keyEncoding,
                                                    final Pattern 
headerNamePattern, final Charset headerEncoding,
                                                    final boolean 
commitOffsets) {
+        return toAttributes(consumerRecord, keyEncoding, headerNamePattern, 
null, headerEncoding, commitOffsets);
+    }
+
+    public static Map<String, String> toAttributes(final ByteRecord 
consumerRecord, final KeyEncoding keyEncoding,
+                                                   final Pattern 
headerNamePattern, final String headerNamePrefix,
+                                                   final Charset 
headerEncoding, final boolean commitOffsets) {
         final Map<String, String> attributes = new HashMap<>();
         attributes.put(KafkaFlowFileAttribute.KAFKA_TOPIC, 
consumerRecord.getTopic());
         attributes.put(KafkaFlowFileAttribute.KAFKA_PARTITION, 
Long.toString(consumerRecord.getPartition()));
@@ -97,7 +103,8 @@ public class KafkaUtils {
                 final String name = header.key();
                 if (headerNamePattern.matcher(name).matches()) {
                     final String value = new String(header.value(), 
headerEncoding);
-                    attributes.put(name, value);
+                    final String attributeName = headerNamePrefix != null ? 
headerNamePrefix + name : name;
+                    attributes.put(attributeName, value);
                 }
             }
         }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/FlowFileStreamKafkaMessageConverter.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/FlowFileStreamKafkaMessageConverter.java
index 5c5a916157..815ddcb32b 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/FlowFileStreamKafkaMessageConverter.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/FlowFileStreamKafkaMessageConverter.java
@@ -33,6 +33,7 @@ import java.util.regex.Pattern;
 public class FlowFileStreamKafkaMessageConverter implements 
KafkaMessageConverter {
     private final Charset headerEncoding;
     private final Pattern headerNamePattern;
+    private final String headerNamePrefix;
     private final KeyEncoding keyEncoding;
     private final boolean commitOffsets;
     private final OffsetTracker offsetTracker;
@@ -41,12 +42,14 @@ public class FlowFileStreamKafkaMessageConverter implements 
KafkaMessageConverte
     public FlowFileStreamKafkaMessageConverter(
             final Charset headerEncoding,
             final Pattern headerNamePattern,
+            final String headerNamePrefix,
             final KeyEncoding keyEncoding,
             final boolean commitOffsets,
             final OffsetTracker offsetTracker,
             final String brokerUri) {
         this.headerEncoding = headerEncoding;
         this.headerNamePattern = headerNamePattern;
+        this.headerNamePrefix = headerNamePrefix;
         this.keyEncoding = keyEncoding;
         this.commitOffsets = commitOffsets;
         this.offsetTracker = offsetTracker;
@@ -68,7 +71,7 @@ public class FlowFileStreamKafkaMessageConverter implements 
KafkaMessageConverte
             }
 
             final Map<String, String> attributes = KafkaUtils.toAttributes(
-                    consumerRecord, keyEncoding, headerNamePattern, 
headerEncoding, commitOffsets);
+                    consumerRecord, keyEncoding, headerNamePattern, 
headerNamePrefix, headerEncoding, commitOffsets);
             flowFile = session.putAllAttributes(flowFile, attributes);
 
             session.getProvenanceReporter().receive(flowFile, brokerUri + "/" 
+ consumerRecord.getTopic());
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/common/KafkaUtilsTest.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/common/KafkaUtilsTest.java
new file mode 100644
index 0000000000..7aeedeb382
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/common/KafkaUtilsTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors.common;
+
+import org.apache.nifi.kafka.service.api.header.RecordHeader;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+class KafkaUtilsTest {
+
+    private static final String TEST_TOPIC = "test-topic";
+    private static final int TEST_PARTITION = 0;
+    private static final long TEST_OFFSET = 100L;
+    private static final long TEST_TIMESTAMP = System.currentTimeMillis();
+
+    @Test
+    void testToAttributesWithoutPrefix() {
+        final List<RecordHeader> headers = List.of(
+                new RecordHeader("uuid", 
"test-uuid".getBytes(StandardCharsets.UTF_8)),
+                new RecordHeader("custom-header", 
"custom-value".getBytes(StandardCharsets.UTF_8))
+        );
+
+        final ByteRecord record = createByteRecord(headers);
+        final Pattern headerPattern = Pattern.compile(".*");
+
+        final Map<String, String> attributes = KafkaUtils.toAttributes(
+                record, KeyEncoding.UTF8, headerPattern, 
StandardCharsets.UTF_8, true);
+
+        // Verify standard Kafka attributes
+        assertEquals(TEST_TOPIC, 
attributes.get(KafkaFlowFileAttribute.KAFKA_TOPIC));
+        assertEquals(String.valueOf(TEST_PARTITION), 
attributes.get(KafkaFlowFileAttribute.KAFKA_PARTITION));
+        assertEquals(String.valueOf(TEST_OFFSET), 
attributes.get(KafkaFlowFileAttribute.KAFKA_OFFSET));
+
+        // Verify headers are added without prefix
+        assertEquals("test-uuid", attributes.get("uuid"));
+        assertEquals("custom-value", attributes.get("custom-header"));
+    }
+
+    @Test
+    void testToAttributesWithPrefix() {
+        final List<RecordHeader> headers = List.of(
+                new RecordHeader("uuid", 
"test-uuid".getBytes(StandardCharsets.UTF_8)),
+                new RecordHeader("filename", 
"test-filename".getBytes(StandardCharsets.UTF_8)),
+                new RecordHeader("custom-header", 
"custom-value".getBytes(StandardCharsets.UTF_8))
+        );
+
+        final ByteRecord record = createByteRecord(headers);
+        final Pattern headerPattern = Pattern.compile(".*");
+        final String prefix = "kafka.header.";
+
+        final Map<String, String> attributes = KafkaUtils.toAttributes(
+                record, KeyEncoding.UTF8, headerPattern, prefix, 
StandardCharsets.UTF_8, true);
+
+        // Verify standard Kafka attributes are NOT prefixed
+        assertEquals(TEST_TOPIC, 
attributes.get(KafkaFlowFileAttribute.KAFKA_TOPIC));
+        assertEquals(String.valueOf(TEST_PARTITION), 
attributes.get(KafkaFlowFileAttribute.KAFKA_PARTITION));
+        assertEquals(String.valueOf(TEST_OFFSET), 
attributes.get(KafkaFlowFileAttribute.KAFKA_OFFSET));
+
+        // Verify headers are added WITH prefix
+        assertEquals("test-uuid", attributes.get("kafka.header.uuid"));
+        assertEquals("test-filename", attributes.get("kafka.header.filename"));
+        assertEquals("custom-value", 
attributes.get("kafka.header.custom-header"));
+
+        // Verify original header names are NOT present
+        assertFalse(attributes.containsKey("uuid"));
+        assertFalse(attributes.containsKey("filename"));
+        assertFalse(attributes.containsKey("custom-header"));
+    }
+
+    @Test
+    void testToAttributesWithPrefixAndPatternFiltering() {
+        final List<RecordHeader> headers = List.of(
+                new RecordHeader("uuid", 
"test-uuid".getBytes(StandardCharsets.UTF_8)),
+                new RecordHeader("keep-this", 
"keep-value".getBytes(StandardCharsets.UTF_8)),
+                new RecordHeader("skip-this", 
"skip-value".getBytes(StandardCharsets.UTF_8))
+        );
+
+        final ByteRecord record = createByteRecord(headers);
+        // Pattern that matches only headers starting with "uuid" or "keep"
+        final Pattern headerPattern = Pattern.compile("(uuid|keep-.*)");
+        final String prefix = "msg.";
+
+        final Map<String, String> attributes = KafkaUtils.toAttributes(
+                record, KeyEncoding.UTF8, headerPattern, prefix, 
StandardCharsets.UTF_8, true);
+
+        // Verify matching headers are added with prefix
+        assertEquals("test-uuid", attributes.get("msg.uuid"));
+        assertEquals("keep-value", attributes.get("msg.keep-this"));
+
+        // Verify non-matching header is NOT present (with or without prefix)
+        assertFalse(attributes.containsKey("msg.skip-this"));
+        assertFalse(attributes.containsKey("skip-this"));
+
+        // Verify header count includes all headers (not just filtered ones)
+        assertEquals("3", 
attributes.get(KafkaFlowFileAttribute.KAFKA_HEADER_COUNT));
+    }
+
+    @Test
+    void testToAttributesWithNullPrefix() {
+        final List<RecordHeader> headers = List.of(
+                new RecordHeader("header1", 
"value1".getBytes(StandardCharsets.UTF_8))
+        );
+
+        final ByteRecord record = createByteRecord(headers);
+        final Pattern headerPattern = Pattern.compile(".*");
+
+        // Explicitly pass null for prefix
+        final Map<String, String> attributes = KafkaUtils.toAttributes(
+                record, KeyEncoding.UTF8, headerPattern, null, 
StandardCharsets.UTF_8, true);
+
+        // Verify header is added without prefix when prefix is null
+        assertEquals("value1", attributes.get("header1"));
+        assertFalse(attributes.containsKey("nullheader1"));
+    }
+
+    @Test
+    void testToAttributesWithEmptyPrefix() {
+        final List<RecordHeader> headers = List.of(
+                new RecordHeader("header1", 
"value1".getBytes(StandardCharsets.UTF_8))
+        );
+
+        final ByteRecord record = createByteRecord(headers);
+        final Pattern headerPattern = Pattern.compile(".*");
+
+        // Pass empty string for prefix
+        final Map<String, String> attributes = KafkaUtils.toAttributes(
+                record, KeyEncoding.UTF8, headerPattern, "", 
StandardCharsets.UTF_8, true);
+
+        // Verify header is added with empty prefix (effectively no prefix)
+        assertEquals("value1", attributes.get("header1"));
+    }
+
+    private ByteRecord createByteRecord(final List<RecordHeader> headers) {
+        return new ByteRecord(
+                TEST_TOPIC,
+                TEST_PARTITION,
+                TEST_OFFSET,
+                TEST_TIMESTAMP,
+                headers,
+                null,  // key
+                "test-value".getBytes(StandardCharsets.UTF_8),
+                1  // count
+        );
+    }
+}

Reply via email to