This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 52bffaddf8 NIFI-14822 Added Inject Offset Output Strategy to 
ConsumeKafka
52bffaddf8 is described below

commit 52bffaddf84add6e420581fa9ee641d5efdaa983
Author: exceptionfactory <[email protected]>
AuthorDate: Tue Aug 5 15:40:13 2025 -0500

    NIFI-14822 Added Inject Offset Output Strategy to ConsumeKafka
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #10171.
---
 .../ConsumeKafkaInjectOffsetRecordIT.java          | 133 +++++++++++++++++++++
 .../apache/nifi/kafka/processors/ConsumeKafka.java |  15 ++-
 ...ectOffsetRecordStreamKafkaMessageConverter.java |  93 ++++++++++++++
 .../nifi/kafka/shared/property/OutputStrategy.java |   3 +-
 4 files changed, 242 insertions(+), 2 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaInjectOffsetRecordIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaInjectOffsetRecordIT.java
new file mode 100644
index 0000000000..12b9249abe
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaInjectOffsetRecordIT.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.property.OutputStrategy;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ConsumeKafkaInjectOffsetRecordIT extends AbstractConsumeKafkaIT {
+    private static final int TEST_RECORD_COUNT = 2;
+
+    private static final int FIRST_PARTITION = 0;
+
+    private static final String FIRST_OFFSET = "0";
+
+    private static final String MAX_OFFSET = "1";
+
+    private static final String RECORD_1 = """
+            { "id": 1, "name": "A" }
+            """;
+
+    private static final String OUTPUT_RECORD_1 = """
+            {"id":1,"name":"A","kafkaOffset":0}
+            """;
+
+    private static final String RECORD_2 = """
+            { "id": 2, "name": "B" }
+            """;
+
+    private static final String OUTPUT_RECORD_2 = """
+            {"id":2,"name":"B","kafkaOffset":1}
+            """;
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setRunner() throws InitializationException {
+        runner = TestRunners.newTestRunner(ConsumeKafka.class);
+        addKafkaConnectionService(runner);
+        runner.setProperty(ConsumeKafka.CONNECTION_SERVICE, 
CONNECTION_SERVICE_ID);
+        addRecordReaderService(runner);
+        addRecordWriterService(runner);
+        addRecordKeyReaderService(runner);
+    }
+
+    @Test
+    void testRun() throws InterruptedException, ExecutionException, 
IOException {
+        final String topic = UUID.randomUUID().toString();
+        final String groupId = UUID.randomUUID().toString();
+
+        runner.setProperty(ConsumeKafka.GROUP_ID, groupId);
+        runner.setProperty(ConsumeKafka.TOPICS, topic);
+        runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, 
ProcessingStrategy.RECORD);
+        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
AutoOffsetReset.EARLIEST);
+        runner.setProperty(ConsumeKafka.OUTPUT_STRATEGY, 
OutputStrategy.INJECT_OFFSET);
+
+        runner.run(1, false, true);
+
+        produceOne(topic, FIRST_PARTITION, null, RECORD_1, null);
+        produceOne(topic, FIRST_PARTITION, null, RECORD_2, null);
+
+        while 
(runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS).isEmpty()) {
+            runner.run(1, false, false);
+        }
+        runner.run(1, true, false);
+
+        final List<MockFlowFile> flowFilesForRelationship = 
runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS);
+        assertEquals(1, flowFilesForRelationship.size());
+        final Iterator<MockFlowFile> flowFiles = 
flowFilesForRelationship.iterator();
+        assertTrue(flowFiles.hasNext());
+
+        final MockFlowFile flowFile = flowFiles.next();
+        flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_TOPIC, 
topic);
+        flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_PARTITION, 
Integer.toString(FIRST_PARTITION));
+        
flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_MAX_OFFSET, 
MAX_OFFSET);
+        flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_OFFSET, 
FIRST_OFFSET);
+        flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_COUNT, 
Integer.toString(TEST_RECORD_COUNT));
+
+        flowFile.assertAttributeEquals("record.count", 
Integer.toString(TEST_RECORD_COUNT));
+
+        final List<ProvenanceEventRecord> provenanceEvents = 
runner.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord provenanceEvent = 
provenanceEvents.getFirst();
+        assertEquals(ProvenanceEventType.RECEIVE, 
provenanceEvent.getEventType());
+
+        final JsonNode jsonNodeTree = 
objectMapper.readTree(flowFile.getContent());
+        assertInstanceOf(ArrayNode.class, jsonNodeTree);
+        assertEquals(2, jsonNodeTree.size());
+
+        final JsonNode firstRecord = jsonNodeTree.get(0);
+        final JsonNode expectedFirstRecord = 
objectMapper.readTree(OUTPUT_RECORD_1);
+        assertEquals(expectedFirstRecord, firstRecord);
+
+        final JsonNode secondRecord = jsonNodeTree.get(1);
+        final JsonNode expectedSecondRecord = 
objectMapper.readTree(OUTPUT_RECORD_2);
+        assertEquals(expectedSecondRecord, secondRecord);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index 24b902db87..455c5ee5dc 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -35,6 +35,7 @@ import 
org.apache.nifi.kafka.processors.consumer.OffsetTracker;
 import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
 import org.apache.nifi.kafka.processors.consumer.bundle.ByteRecordBundler;
 import 
org.apache.nifi.kafka.processors.consumer.convert.FlowFileStreamKafkaMessageConverter;
+import 
org.apache.nifi.kafka.processors.consumer.convert.InjectOffsetRecordStreamKafkaMessageConverter;
 import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter;
 import 
org.apache.nifi.kafka.processors.consumer.convert.RecordStreamKafkaMessageConverter;
 import 
org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafkaMessageConverter;
@@ -585,7 +586,19 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
         final KafkaMessageConverter converter;
         if (outputStrategy == OutputStrategy.USE_VALUE) {
             converter = new RecordStreamKafkaMessageConverter(readerFactory, 
writerFactory, headerEncoding, headerNamePattern,
-                keyEncoding, commitOffsets, offsetTracker, getLogger(), 
brokerUri);
+                    keyEncoding, commitOffsets, offsetTracker, getLogger(), 
brokerUri);
+        } else if (outputStrategy == OutputStrategy.INJECT_OFFSET) {
+            converter = new InjectOffsetRecordStreamKafkaMessageConverter(
+                    readerFactory,
+                    writerFactory,
+                    headerEncoding,
+                    headerNamePattern,
+                    keyEncoding,
+                    commitOffsets,
+                    offsetTracker,
+                    getLogger(),
+                    brokerUri
+            );
         } else {
             final RecordReaderFactory keyReaderFactory = keyFormat == 
KeyFormat.RECORD
                 ? 
context.getProperty(KEY_RECORD_READER).asControllerService(RecordReaderFactory.class)
 : null;
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/InjectOffsetRecordStreamKafkaMessageConverter.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/InjectOffsetRecordStreamKafkaMessageConverter.java
new file mode 100644
index 0000000000..49b4f86b82
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/InjectOffsetRecordStreamKafkaMessageConverter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors.consumer.convert;
+
+import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Kafka Message Converter supporting injection of Kafka record offset in the 
kafkaOffset field of the output record
+ */
+public class InjectOffsetRecordStreamKafkaMessageConverter extends 
AbstractRecordStreamKafkaMessageConverter {
+
+    private static final String KAFKA_OFFSET = "kafkaOffset";
+
+    private static final RecordField KAFKA_OFFSET_FIELD = new 
RecordField(KAFKA_OFFSET, RecordFieldType.LONG.getDataType());
+
+    public InjectOffsetRecordStreamKafkaMessageConverter(
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final Charset headerEncoding,
+            final Pattern headerNamePattern,
+            final KeyEncoding keyEncoding,
+            final boolean commitOffsets,
+            final OffsetTracker offsetTracker,
+            final ComponentLog logger,
+            final String brokerUri
+    ) {
+        super(
+                readerFactory,
+                writerFactory,
+                headerEncoding,
+                headerNamePattern,
+                keyEncoding,
+                commitOffsets,
+                offsetTracker,
+                logger,
+                brokerUri
+        );
+    }
+
+    @Override
+    protected RecordSchema getWriteSchema(final RecordSchema inputSchema, 
final ByteRecord consumerRecord, final Map<String, String> attributes) {
+        return getConvertedRecordSchema(inputSchema);
+    }
+
+    @Override
+    protected Record convertRecord(final ByteRecord consumerRecord, final 
Record record, final Map<String, String> attributes) {
+        final Map<String, Object> values = new HashMap<>(record.toMap());
+
+        final long offset = consumerRecord.getOffset();
+        values.put(KAFKA_OFFSET, offset);
+
+        final RecordSchema convertedRecordSchema = 
getConvertedRecordSchema(record.getSchema());
+        return new MapRecord(convertedRecordSchema, values);
+    }
+
+    private RecordSchema getConvertedRecordSchema(final RecordSchema 
inputRecordSchema) {
+        final List<RecordField> schemaFields = new 
ArrayList<>(inputRecordSchema.getFields());
+        schemaFields.add(KAFKA_OFFSET_FIELD);
+        return new SimpleRecordSchema(schemaFields);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/OutputStrategy.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/OutputStrategy.java
index d9dcd74095..da41f672da 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/OutputStrategy.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/OutputStrategy.java
@@ -25,7 +25,8 @@ public enum OutputStrategy implements DescribedValue {
     USE_VALUE("USE_VALUE", "Use Content as Value", "Write only the Kafka 
Record value to the FlowFile record."),
     USE_WRAPPER("USE_WRAPPER", "Use Wrapper", "Write the Kafka Record key, 
value, headers, and metadata into the FlowFile record. (See processor's 
additional details for more information.)"),
     INJECT_METADATA("INJECT_METADATA", "Inject Metadata",
-            "Write the Kafka Record value to the FlowFile record and add a 
sub-record to it with key, headers, and metadata. (See processor's additional 
details for more information.)");
+            "Write the Kafka Record value to the FlowFile record and add a 
sub-record to it with key, headers, and metadata. (See processor's additional 
details for more information.)"),
+    INJECT_OFFSET("INJECT_OFFSET", "Inject Offset", "Write the Kafka Record 
value to the FlowFile record and write the Kafka Record offset to a field named 
kafkaOffset");
 
     private final String value;
     private final String displayName;

Reply via email to