This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b71f10aec NIFI-14619 Added kafka.max.offset to Record FlowFiles from 
ConsumeKafka This closes #9983.
4b71f10aec is described below

commit 4b71f10aec31cb0ca7b2a3318c985e2d6c9131fd
Author: exceptionfactory <[email protected]>
AuthorDate: Fri May 30 15:12:31 2025 -0500

    NIFI-14619 Added kafka.max.offset to Record FlowFiles from ConsumeKafka
    This closes #9983.
    
    Signed-off-by: Joseph Witt <[email protected]>
---
 .../nifi/kafka/processors/ConsumeKafkaRecordIT.java   |  2 ++
 .../apache/nifi/kafka/processors/ConsumeKafka.java    |  3 ++-
 .../AbstractRecordStreamKafkaMessageConverter.java    | 19 ++++++++++++++++---
 3 files changed, 20 insertions(+), 4 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordIT.java
index d7c3508e56..6e8b37d6fc 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordIT.java
@@ -168,6 +168,8 @@ class ConsumeKafkaRecordIT extends AbstractConsumeKafkaIT {
         flowFile.assertContentEquals(flowFileString);
         flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_TOPIC, 
topic);
         flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_PARTITION, 
Integer.toString(FIRST_PARTITION));
+        
flowFile.assertAttributeExists(KafkaFlowFileAttribute.KAFKA_MAX_OFFSET);
+
         flowFile.assertAttributeEquals("record.count", 
Long.toString(TEST_RECORD_COUNT));
         flowFile.assertAttributeEquals("aaa", "value");
         flowFile.assertAttributeNotExists("bbb");
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index 4643a02733..ad1bd6d2c8 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -96,7 +96,8 @@ import static 
org.apache.nifi.expression.ExpressionLanguageScope.NONE;
         @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TIMESTAMP, 
description = "The timestamp of the message in the partition of the topic."),
         @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_PARTITION, 
description = "The partition of the topic the message or message bundle is 
from"),
         @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC, 
description = "The topic the message or message bundle is from"),
-        @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, 
description = "Set to true if the consumed message is a tombstone message")
+        @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, 
description = "Set to true if the consumed message is a tombstone message"),
+        @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_MAX_OFFSET, 
description = "The maximum value of the Kafka offset in batch of records")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @SeeAlso({PublishKafka.class})
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/AbstractRecordStreamKafkaMessageConverter.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/AbstractRecordStreamKafkaMessageConverter.java
index f0fce4a0d4..caed26f07b 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/AbstractRecordStreamKafkaMessageConverter.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/AbstractRecordStreamKafkaMessageConverter.java
@@ -47,6 +47,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 
 public abstract class AbstractRecordStreamKafkaMessageConverter implements 
KafkaMessageConverter {
@@ -153,8 +154,16 @@ public abstract class 
AbstractRecordStreamKafkaMessageConverter implements Kafka
                 throw ex;
             }
 
-            group = new RecordGroup(ff, writer);
+            final long offset = consumerRecord.getOffset();
+            final AtomicLong maxOffset = new AtomicLong(offset);
+            group = new RecordGroup(ff, writer, maxOffset);
             recordGroups.put(criteria, group);
+        } else {
+            final long recordOffset = consumerRecord.getOffset();
+            final AtomicLong maxOffset = group.maxOffset();
+            if (recordOffset > maxOffset.get()) {
+                maxOffset.set(recordOffset);
+            }
         }
 
         // let subclass convert into the thing to write
@@ -176,6 +185,10 @@ public abstract class 
AbstractRecordStreamKafkaMessageConverter implements Kafka
                 resultAttrs.putAll(wr.getAttributes());
                 resultAttrs.put("record.count", 
String.valueOf(wr.getRecordCount()));
                 resultAttrs.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+
+                final long maxOffset = group.maxOffset().get();
+                resultAttrs.put(KafkaFlowFileAttribute.KAFKA_MAX_OFFSET, 
Long.toString(maxOffset));
+
                 // add any extra header‐derived attributes
                 resultAttrs.putAll(criteria.extraAttributes());
                 
resultAttrs.put(KafkaFlowFileAttribute.KAFKA_CONSUMER_OFFSETS_COMMITTED, 
String.valueOf(commitOffsets));
@@ -212,9 +225,9 @@ public abstract class 
AbstractRecordStreamKafkaMessageConverter implements Kafka
 
     protected abstract Record convertRecord(ByteRecord consumerRecord, Record 
record, Map<String, String> attributes) throws IOException;
 
-    private static record RecordGroupCriteria(RecordSchema schema, Map<String, 
String> extraAttributes, String topic, int partition) {
+    private record RecordGroupCriteria(RecordSchema schema, Map<String, 
String> extraAttributes, String topic, int partition) {
     }
 
-    private static record RecordGroup(FlowFile flowFile, RecordSetWriter 
writer) {
+    private record RecordGroup(FlowFile flowFile, RecordSetWriter writer, 
AtomicLong maxOffset) {
     }
 }

Reply via email to