This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 96ba9cc710 NIFI-15550 - Add optional header prefix property in
ConsumeKafka
96ba9cc710 is described below
commit 96ba9cc710f2ca81a8f52e99d36a4bf75144bd62
Author: Pierre Villard <[email protected]>
AuthorDate: Wed Feb 4 18:26:19 2026 +0100
NIFI-15550 - Add optional header prefix property in ConsumeKafka
This closes #10853.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../nifi/kafka/processors/ConsumeKafkaIT.java | 48 ++++++
.../apache/nifi/kafka/processors/ConsumeKafka.java | 21 ++-
.../nifi/kafka/processors/common/KafkaUtils.java | 9 +-
.../FlowFileStreamKafkaMessageConverter.java | 5 +-
.../kafka/processors/common/KafkaUtilsTest.java | 169 +++++++++++++++++++++
5 files changed, 249 insertions(+), 3 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
index 03523a8cf3..e4e12898e3 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
@@ -116,6 +116,54 @@ class ConsumeKafkaIT extends AbstractConsumeKafkaIT {
flowFile.assertAttributeNotExists("ccc");
}
+ @Test
+ void testProcessingStrategyFlowFileHeaderNamePrefix() throws
InterruptedException, ExecutionException {
+ final String topic = UUID.randomUUID().toString();
+ final String groupId = topic.substring(0, topic.indexOf("-"));
+
+ runner.setProperty(ConsumeKafka.GROUP_ID, groupId);
+ runner.setProperty(ConsumeKafka.TOPICS, topic);
+ runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY,
ProcessingStrategy.FLOW_FILE.getValue());
+ runner.setProperty(ConsumeKafka.HEADER_NAME_PATTERN, ".*");
+ runner.setProperty(ConsumeKafka.HEADER_NAME_PREFIX, "kafka.header.");
+
+ runner.run(1, false, true);
+
+ final List<Header> headers = Arrays.asList(
+ new RecordHeader("uuid",
"test-uuid-value".getBytes(StandardCharsets.UTF_8)),
+ new RecordHeader("filename",
"test-filename".getBytes(StandardCharsets.UTF_8)),
+ new RecordHeader("custom",
"custom-value".getBytes(StandardCharsets.UTF_8)));
+ produceOne(topic, 0, null, RECORD_VALUE, headers);
+ while (runner.getFlowFilesForRelationship("success").isEmpty()) {
+ runner.run(1, false, false);
+ }
+
+ runner.run(1, true, false);
+
+ final Iterator<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS).iterator();
+ assertTrue(flowFiles.hasNext());
+
+ final MockFlowFile flowFile = flowFiles.next();
+ flowFile.assertContentEquals(RECORD_VALUE);
+ flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_TOPIC,
topic);
+ flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_PARTITION,
Integer.toString(FIRST_PARTITION));
+ flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_OFFSET,
Long.toString(FIRST_OFFSET));
+ flowFile.assertAttributeExists(KafkaFlowFileAttribute.KAFKA_TIMESTAMP);
+
flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_HEADER_COUNT, "3");
+
+ // Verify headers are prefixed with "kafka.header."
+ flowFile.assertAttributeEquals("kafka.header.uuid", "test-uuid-value");
+ flowFile.assertAttributeEquals("kafka.header.filename",
"test-filename");
+ flowFile.assertAttributeEquals("kafka.header.custom", "custom-value");
+
+ // Verify the Kafka header values did NOT overwrite the core FlowFile
attributes
+ // Note: "uuid" and "filename" are core FlowFile attributes that
always exist,
+ // but they should NOT contain the Kafka header values when prefix is
used
+ flowFile.assertAttributeExists("uuid"); // Core attribute exists
+ flowFile.assertAttributeNotEquals("uuid", "test-uuid-value"); // But
not with Kafka header value
+ flowFile.assertAttributeNotExists("custom"); // Non-core attributes
should not exist without prefix
+ }
+
/**
* Test ability to specify a topic regular expression to query for
messages.
*/
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index 455c5ee5dc..336f6e77d9 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -210,6 +210,18 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
.expressionLanguageSupported(NONE)
.build();
+ static final PropertyDescriptor HEADER_NAME_PREFIX = new
PropertyDescriptor.Builder()
+ .name("Header Name Prefix")
+ .description("""
+ A prefix to apply to the FlowFile attribute name when
writing Kafka Record Header values.
+ This is useful to avoid conflicts with reserved FlowFile
attribute names such as 'uuid'.
+ For example, if set to 'kafka.header.', a Kafka header
named 'uuid' would be written as 'kafka.header.uuid'.
+ """)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .required(false)
+ .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE)
+ .build();
+
static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
.name("Record Reader")
.description("The Record Reader to use for incoming Kafka
messages")
@@ -304,6 +316,7 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
HEADER_NAME_PATTERN,
HEADER_ENCODING,
PROCESSING_STRATEGY,
+ HEADER_NAME_PREFIX,
RECORD_READER,
RECORD_WRITER,
OUTPUT_STRATEGY,
@@ -319,6 +332,7 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
private volatile Charset headerEncoding;
private volatile Pattern headerNamePattern;
+ private volatile String headerNamePrefix;
private volatile ProcessingStrategy processingStrategy;
private volatile KeyEncoding keyEncoding;
private volatile OutputStrategy outputStrategy;
@@ -366,6 +380,11 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
keyEncoding =
context.getProperty(KEY_ATTRIBUTE_ENCODING).asAllowableValue(KeyEncoding.class);
commitOffsets = context.getProperty(COMMIT_OFFSETS).asBoolean();
processingStrategy =
context.getProperty(PROCESSING_STRATEGY).asAllowableValue(ProcessingStrategy.class);
+
+ // Only read HEADER_NAME_PREFIX when PROCESSING_STRATEGY is FLOW_FILE
(property dependency)
+ headerNamePrefix = processingStrategy == ProcessingStrategy.FLOW_FILE
+ ? context.getProperty(HEADER_NAME_PREFIX).getValue()
+ : null;
outputStrategy = processingStrategy == ProcessingStrategy.RECORD ?
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class) :
null;
keyFormat = (outputStrategy == OutputStrategy.USE_WRAPPER ||
outputStrategy == OutputStrategy.INJECT_METADATA)
?
context.getProperty(KEY_FORMAT).asAllowableValue(KeyFormat.class)
@@ -612,7 +631,7 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
private void processInputFlowFile(final ProcessSession session, final
OffsetTracker offsetTracker, final Iterator<ByteRecord> consumerRecords) {
final KafkaMessageConverter converter = new
FlowFileStreamKafkaMessageConverter(
- headerEncoding, headerNamePattern, keyEncoding, commitOffsets,
offsetTracker, brokerUri);
+ headerEncoding, headerNamePattern, headerNamePrefix, keyEncoding,
commitOffsets, offsetTracker, brokerUri);
converter.toFlowFiles(session, consumerRecords);
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java
index 0dbfd158b1..40e1e37367 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java
@@ -74,6 +74,12 @@ public class KafkaUtils {
public static Map<String, String> toAttributes(final ByteRecord
consumerRecord, final KeyEncoding keyEncoding,
final Pattern
headerNamePattern, final Charset headerEncoding,
final boolean
commitOffsets) {
+ return toAttributes(consumerRecord, keyEncoding, headerNamePattern,
null, headerEncoding, commitOffsets);
+ }
+
+ public static Map<String, String> toAttributes(final ByteRecord
consumerRecord, final KeyEncoding keyEncoding,
+ final Pattern
headerNamePattern, final String headerNamePrefix,
+ final Charset
headerEncoding, final boolean commitOffsets) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(KafkaFlowFileAttribute.KAFKA_TOPIC,
consumerRecord.getTopic());
attributes.put(KafkaFlowFileAttribute.KAFKA_PARTITION,
Long.toString(consumerRecord.getPartition()));
@@ -97,7 +103,8 @@ public class KafkaUtils {
final String name = header.key();
if (headerNamePattern.matcher(name).matches()) {
final String value = new String(header.value(),
headerEncoding);
- attributes.put(name, value);
+ final String attributeName = headerNamePrefix != null ?
headerNamePrefix + name : name;
+ attributes.put(attributeName, value);
}
}
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/FlowFileStreamKafkaMessageConverter.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/FlowFileStreamKafkaMessageConverter.java
index 5c5a916157..815ddcb32b 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/FlowFileStreamKafkaMessageConverter.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/FlowFileStreamKafkaMessageConverter.java
@@ -33,6 +33,7 @@ import java.util.regex.Pattern;
public class FlowFileStreamKafkaMessageConverter implements
KafkaMessageConverter {
private final Charset headerEncoding;
private final Pattern headerNamePattern;
+ private final String headerNamePrefix;
private final KeyEncoding keyEncoding;
private final boolean commitOffsets;
private final OffsetTracker offsetTracker;
@@ -41,12 +42,14 @@ public class FlowFileStreamKafkaMessageConverter implements
KafkaMessageConverte
public FlowFileStreamKafkaMessageConverter(
final Charset headerEncoding,
final Pattern headerNamePattern,
+ final String headerNamePrefix,
final KeyEncoding keyEncoding,
final boolean commitOffsets,
final OffsetTracker offsetTracker,
final String brokerUri) {
this.headerEncoding = headerEncoding;
this.headerNamePattern = headerNamePattern;
+ this.headerNamePrefix = headerNamePrefix;
this.keyEncoding = keyEncoding;
this.commitOffsets = commitOffsets;
this.offsetTracker = offsetTracker;
@@ -68,7 +71,7 @@ public class FlowFileStreamKafkaMessageConverter implements
KafkaMessageConverte
}
final Map<String, String> attributes = KafkaUtils.toAttributes(
- consumerRecord, keyEncoding, headerNamePattern,
headerEncoding, commitOffsets);
+ consumerRecord, keyEncoding, headerNamePattern,
headerNamePrefix, headerEncoding, commitOffsets);
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, brokerUri + "/"
+ consumerRecord.getTopic());
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/common/KafkaUtilsTest.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/common/KafkaUtilsTest.java
new file mode 100644
index 0000000000..7aeedeb382
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/common/KafkaUtilsTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors.common;
+
+import org.apache.nifi.kafka.service.api.header.RecordHeader;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+class KafkaUtilsTest {
+
+ private static final String TEST_TOPIC = "test-topic";
+ private static final int TEST_PARTITION = 0;
+ private static final long TEST_OFFSET = 100L;
+ private static final long TEST_TIMESTAMP = System.currentTimeMillis();
+
+ @Test
+ void testToAttributesWithoutPrefix() {
+ final List<RecordHeader> headers = List.of(
+ new RecordHeader("uuid",
"test-uuid".getBytes(StandardCharsets.UTF_8)),
+ new RecordHeader("custom-header",
"custom-value".getBytes(StandardCharsets.UTF_8))
+ );
+
+ final ByteRecord record = createByteRecord(headers);
+ final Pattern headerPattern = Pattern.compile(".*");
+
+ final Map<String, String> attributes = KafkaUtils.toAttributes(
+ record, KeyEncoding.UTF8, headerPattern,
StandardCharsets.UTF_8, true);
+
+ // Verify standard Kafka attributes
+ assertEquals(TEST_TOPIC,
attributes.get(KafkaFlowFileAttribute.KAFKA_TOPIC));
+ assertEquals(String.valueOf(TEST_PARTITION),
attributes.get(KafkaFlowFileAttribute.KAFKA_PARTITION));
+ assertEquals(String.valueOf(TEST_OFFSET),
attributes.get(KafkaFlowFileAttribute.KAFKA_OFFSET));
+
+ // Verify headers are added without prefix
+ assertEquals("test-uuid", attributes.get("uuid"));
+ assertEquals("custom-value", attributes.get("custom-header"));
+ }
+
+ @Test
+ void testToAttributesWithPrefix() {
+ final List<RecordHeader> headers = List.of(
+ new RecordHeader("uuid",
"test-uuid".getBytes(StandardCharsets.UTF_8)),
+ new RecordHeader("filename",
"test-filename".getBytes(StandardCharsets.UTF_8)),
+ new RecordHeader("custom-header",
"custom-value".getBytes(StandardCharsets.UTF_8))
+ );
+
+ final ByteRecord record = createByteRecord(headers);
+ final Pattern headerPattern = Pattern.compile(".*");
+ final String prefix = "kafka.header.";
+
+ final Map<String, String> attributes = KafkaUtils.toAttributes(
+ record, KeyEncoding.UTF8, headerPattern, prefix,
StandardCharsets.UTF_8, true);
+
+ // Verify standard Kafka attributes are NOT prefixed
+ assertEquals(TEST_TOPIC,
attributes.get(KafkaFlowFileAttribute.KAFKA_TOPIC));
+ assertEquals(String.valueOf(TEST_PARTITION),
attributes.get(KafkaFlowFileAttribute.KAFKA_PARTITION));
+ assertEquals(String.valueOf(TEST_OFFSET),
attributes.get(KafkaFlowFileAttribute.KAFKA_OFFSET));
+
+ // Verify headers are added WITH prefix
+ assertEquals("test-uuid", attributes.get("kafka.header.uuid"));
+ assertEquals("test-filename", attributes.get("kafka.header.filename"));
+ assertEquals("custom-value",
attributes.get("kafka.header.custom-header"));
+
+ // Verify original header names are NOT present
+ assertFalse(attributes.containsKey("uuid"));
+ assertFalse(attributes.containsKey("filename"));
+ assertFalse(attributes.containsKey("custom-header"));
+ }
+
+ @Test
+ void testToAttributesWithPrefixAndPatternFiltering() {
+ final List<RecordHeader> headers = List.of(
+ new RecordHeader("uuid",
"test-uuid".getBytes(StandardCharsets.UTF_8)),
+ new RecordHeader("keep-this",
"keep-value".getBytes(StandardCharsets.UTF_8)),
+ new RecordHeader("skip-this",
"skip-value".getBytes(StandardCharsets.UTF_8))
+ );
+
+ final ByteRecord record = createByteRecord(headers);
+ // Pattern that matches only headers starting with "uuid" or "keep"
+ final Pattern headerPattern = Pattern.compile("(uuid|keep-.*)");
+ final String prefix = "msg.";
+
+ final Map<String, String> attributes = KafkaUtils.toAttributes(
+ record, KeyEncoding.UTF8, headerPattern, prefix,
StandardCharsets.UTF_8, true);
+
+ // Verify matching headers are added with prefix
+ assertEquals("test-uuid", attributes.get("msg.uuid"));
+ assertEquals("keep-value", attributes.get("msg.keep-this"));
+
+ // Verify non-matching header is NOT present (with or without prefix)
+ assertFalse(attributes.containsKey("msg.skip-this"));
+ assertFalse(attributes.containsKey("skip-this"));
+
+ // Verify header count includes all headers (not just filtered ones)
+ assertEquals("3",
attributes.get(KafkaFlowFileAttribute.KAFKA_HEADER_COUNT));
+ }
+
+ @Test
+ void testToAttributesWithNullPrefix() {
+ final List<RecordHeader> headers = List.of(
+ new RecordHeader("header1",
"value1".getBytes(StandardCharsets.UTF_8))
+ );
+
+ final ByteRecord record = createByteRecord(headers);
+ final Pattern headerPattern = Pattern.compile(".*");
+
+ // Explicitly pass null for prefix
+ final Map<String, String> attributes = KafkaUtils.toAttributes(
+ record, KeyEncoding.UTF8, headerPattern, null,
StandardCharsets.UTF_8, true);
+
+ // Verify header is added without prefix when prefix is null
+ assertEquals("value1", attributes.get("header1"));
+ assertFalse(attributes.containsKey("nullheader1"));
+ }
+
+ @Test
+ void testToAttributesWithEmptyPrefix() {
+ final List<RecordHeader> headers = List.of(
+ new RecordHeader("header1",
"value1".getBytes(StandardCharsets.UTF_8))
+ );
+
+ final ByteRecord record = createByteRecord(headers);
+ final Pattern headerPattern = Pattern.compile(".*");
+
+ // Pass empty string for prefix
+ final Map<String, String> attributes = KafkaUtils.toAttributes(
+ record, KeyEncoding.UTF8, headerPattern, "",
StandardCharsets.UTF_8, true);
+
+ // Verify header is added with empty prefix (effectively no prefix)
+ assertEquals("value1", attributes.get("header1"));
+ }
+
+ private ByteRecord createByteRecord(final List<RecordHeader> headers) {
+ return new ByteRecord(
+ TEST_TOPIC,
+ TEST_PARTITION,
+ TEST_OFFSET,
+ TEST_TIMESTAMP,
+ headers,
+ null, // key
+ "test-value".getBytes(StandardCharsets.UTF_8),
+ 1 // count
+ );
+ }
+}