This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 52bffaddf8 NIFI-14822 Added Inject Offset Output Strategy to
ConsumeKafka
52bffaddf8 is described below
commit 52bffaddf84add6e420581fa9ee641d5efdaa983
Author: exceptionfactory <[email protected]>
AuthorDate: Tue Aug 5 15:40:13 2025 -0500
NIFI-14822 Added Inject Offset Output Strategy to ConsumeKafka
Signed-off-by: Pierre Villard <[email protected]>
This closes #10171.
---
.../ConsumeKafkaInjectOffsetRecordIT.java | 133 +++++++++++++++++++++
.../apache/nifi/kafka/processors/ConsumeKafka.java | 15 ++-
...ectOffsetRecordStreamKafkaMessageConverter.java | 93 ++++++++++++++
.../nifi/kafka/shared/property/OutputStrategy.java | 3 +-
4 files changed, 242 insertions(+), 2 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaInjectOffsetRecordIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaInjectOffsetRecordIT.java
new file mode 100644
index 0000000000..12b9249abe
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaInjectOffsetRecordIT.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.property.OutputStrategy;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ConsumeKafkaInjectOffsetRecordIT extends AbstractConsumeKafkaIT {
+ private static final int TEST_RECORD_COUNT = 2;
+
+ private static final int FIRST_PARTITION = 0;
+
+ private static final String FIRST_OFFSET = "0";
+
+ private static final String MAX_OFFSET = "1";
+
+ private static final String RECORD_1 = """
+ { "id": 1, "name": "A" }
+ """;
+
+ private static final String OUTPUT_RECORD_1 = """
+ {"id":1,"name":"A","kafkaOffset":0}
+ """;
+
+ private static final String RECORD_2 = """
+ { "id": 2, "name": "B" }
+ """;
+
+ private static final String OUTPUT_RECORD_2 = """
+ {"id":2,"name":"B","kafkaOffset":1}
+ """;
+
+ private TestRunner runner;
+
+ @BeforeEach
+ void setRunner() throws InitializationException {
+ runner = TestRunners.newTestRunner(ConsumeKafka.class);
+ addKafkaConnectionService(runner);
+ runner.setProperty(ConsumeKafka.CONNECTION_SERVICE,
CONNECTION_SERVICE_ID);
+ addRecordReaderService(runner);
+ addRecordWriterService(runner);
+ addRecordKeyReaderService(runner);
+ }
+
+ @Test
+ void testRun() throws InterruptedException, ExecutionException,
IOException {
+ final String topic = UUID.randomUUID().toString();
+ final String groupId = UUID.randomUUID().toString();
+
+ runner.setProperty(ConsumeKafka.GROUP_ID, groupId);
+ runner.setProperty(ConsumeKafka.TOPICS, topic);
+ runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY,
ProcessingStrategy.RECORD);
+ runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET,
AutoOffsetReset.EARLIEST);
+ runner.setProperty(ConsumeKafka.OUTPUT_STRATEGY,
OutputStrategy.INJECT_OFFSET);
+
+ runner.run(1, false, true);
+
+ produceOne(topic, FIRST_PARTITION, null, RECORD_1, null);
+ produceOne(topic, FIRST_PARTITION, null, RECORD_2, null);
+
+ while
(runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS).isEmpty()) {
+ runner.run(1, false, false);
+ }
+ runner.run(1, true, false);
+
+ final List<MockFlowFile> flowFilesForRelationship =
runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS);
+ assertEquals(1, flowFilesForRelationship.size());
+ final Iterator<MockFlowFile> flowFiles =
flowFilesForRelationship.iterator();
+ assertTrue(flowFiles.hasNext());
+
+ final MockFlowFile flowFile = flowFiles.next();
+ flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_TOPIC,
topic);
+ flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_PARTITION,
Integer.toString(FIRST_PARTITION));
+
flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_MAX_OFFSET,
MAX_OFFSET);
+ flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_OFFSET,
FIRST_OFFSET);
+ flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_COUNT,
Integer.toString(TEST_RECORD_COUNT));
+
+ flowFile.assertAttributeEquals("record.count",
Integer.toString(TEST_RECORD_COUNT));
+
+ final List<ProvenanceEventRecord> provenanceEvents =
runner.getProvenanceEvents();
+ assertEquals(1, provenanceEvents.size());
+ final ProvenanceEventRecord provenanceEvent =
provenanceEvents.getFirst();
+ assertEquals(ProvenanceEventType.RECEIVE,
provenanceEvent.getEventType());
+
+ final JsonNode jsonNodeTree =
objectMapper.readTree(flowFile.getContent());
+ assertInstanceOf(ArrayNode.class, jsonNodeTree);
+ assertEquals(2, jsonNodeTree.size());
+
+ final JsonNode firstRecord = jsonNodeTree.get(0);
+ final JsonNode expectedFirstRecord =
objectMapper.readTree(OUTPUT_RECORD_1);
+ assertEquals(expectedFirstRecord, firstRecord);
+
+ final JsonNode secondRecord = jsonNodeTree.get(1);
+ final JsonNode expectedSecondRecord =
objectMapper.readTree(OUTPUT_RECORD_2);
+ assertEquals(expectedSecondRecord, secondRecord);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index 24b902db87..455c5ee5dc 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -35,6 +35,7 @@ import
org.apache.nifi.kafka.processors.consumer.OffsetTracker;
import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
import org.apache.nifi.kafka.processors.consumer.bundle.ByteRecordBundler;
import
org.apache.nifi.kafka.processors.consumer.convert.FlowFileStreamKafkaMessageConverter;
+import
org.apache.nifi.kafka.processors.consumer.convert.InjectOffsetRecordStreamKafkaMessageConverter;
import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter;
import
org.apache.nifi.kafka.processors.consumer.convert.RecordStreamKafkaMessageConverter;
import
org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafkaMessageConverter;
@@ -585,7 +586,19 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
final KafkaMessageConverter converter;
if (outputStrategy == OutputStrategy.USE_VALUE) {
converter = new RecordStreamKafkaMessageConverter(readerFactory,
writerFactory, headerEncoding, headerNamePattern,
- keyEncoding, commitOffsets, offsetTracker, getLogger(),
brokerUri);
+ keyEncoding, commitOffsets, offsetTracker, getLogger(),
brokerUri);
+ } else if (outputStrategy == OutputStrategy.INJECT_OFFSET) {
+ converter = new InjectOffsetRecordStreamKafkaMessageConverter(
+ readerFactory,
+ writerFactory,
+ headerEncoding,
+ headerNamePattern,
+ keyEncoding,
+ commitOffsets,
+ offsetTracker,
+ getLogger(),
+ brokerUri
+ );
} else {
final RecordReaderFactory keyReaderFactory = keyFormat ==
KeyFormat.RECORD
?
context.getProperty(KEY_RECORD_READER).asControllerService(RecordReaderFactory.class)
: null;
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/InjectOffsetRecordStreamKafkaMessageConverter.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/InjectOffsetRecordStreamKafkaMessageConverter.java
new file mode 100644
index 0000000000..49b4f86b82
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/InjectOffsetRecordStreamKafkaMessageConverter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors.consumer.convert;
+
+import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Kafka Message Converter supporting injection of Kafka record offset in the
kafkaOffset field of the output record
+ */
+public class InjectOffsetRecordStreamKafkaMessageConverter extends
AbstractRecordStreamKafkaMessageConverter {
+
+ private static final String KAFKA_OFFSET = "kafkaOffset";
+
+ private static final RecordField KAFKA_OFFSET_FIELD = new
RecordField(KAFKA_OFFSET, RecordFieldType.LONG.getDataType());
+
+ public InjectOffsetRecordStreamKafkaMessageConverter(
+ final RecordReaderFactory readerFactory,
+ final RecordSetWriterFactory writerFactory,
+ final Charset headerEncoding,
+ final Pattern headerNamePattern,
+ final KeyEncoding keyEncoding,
+ final boolean commitOffsets,
+ final OffsetTracker offsetTracker,
+ final ComponentLog logger,
+ final String brokerUri
+ ) {
+ super(
+ readerFactory,
+ writerFactory,
+ headerEncoding,
+ headerNamePattern,
+ keyEncoding,
+ commitOffsets,
+ offsetTracker,
+ logger,
+ brokerUri
+ );
+ }
+
+ @Override
+ protected RecordSchema getWriteSchema(final RecordSchema inputSchema,
final ByteRecord consumerRecord, final Map<String, String> attributes) {
+ return getConvertedRecordSchema(inputSchema);
+ }
+
+ @Override
+ protected Record convertRecord(final ByteRecord consumerRecord, final
Record record, final Map<String, String> attributes) {
+ final Map<String, Object> values = new HashMap<>(record.toMap());
+
+ final long offset = consumerRecord.getOffset();
+ values.put(KAFKA_OFFSET, offset);
+
+ final RecordSchema convertedRecordSchema =
getConvertedRecordSchema(record.getSchema());
+ return new MapRecord(convertedRecordSchema, values);
+ }
+
+ private RecordSchema getConvertedRecordSchema(final RecordSchema
inputRecordSchema) {
+ final List<RecordField> schemaFields = new
ArrayList<>(inputRecordSchema.getFields());
+ schemaFields.add(KAFKA_OFFSET_FIELD);
+ return new SimpleRecordSchema(schemaFields);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/OutputStrategy.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/OutputStrategy.java
index d9dcd74095..da41f672da 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/OutputStrategy.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/OutputStrategy.java
@@ -25,7 +25,8 @@ public enum OutputStrategy implements DescribedValue {
USE_VALUE("USE_VALUE", "Use Content as Value", "Write only the Kafka
Record value to the FlowFile record."),
USE_WRAPPER("USE_WRAPPER", "Use Wrapper", "Write the Kafka Record key,
value, headers, and metadata into the FlowFile record. (See processor's
additional details for more information.)"),
INJECT_METADATA("INJECT_METADATA", "Inject Metadata",
- "Write the Kafka Record value to the FlowFile record and add a
sub-record to it with key, headers, and metadata. (See processor's additional
details for more information.)");
+ "Write the Kafka Record value to the FlowFile record and add a
sub-record to it with key, headers, and metadata. (See processor's additional
details for more information.)"),
+ INJECT_OFFSET("INJECT_OFFSET", "Inject Offset", "Write the Kafka Record
value to the FlowFile record and write the Kafka Record offset to a field named
kafkaOffset");
private final String value;
private final String displayName;