This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 4b71f10aec NIFI-14619 Added kafka.max.offset to Record FlowFiles from
ConsumeKafka This closes #9983.
4b71f10aec is described below
commit 4b71f10aec31cb0ca7b2a3318c985e2d6c9131fd
Author: exceptionfactory <[email protected]>
AuthorDate: Fri May 30 15:12:31 2025 -0500
NIFI-14619 Added kafka.max.offset to Record FlowFiles from ConsumeKafka
This closes #9983.
Signed-off-by: Joseph Witt <[email protected]>
---
.../nifi/kafka/processors/ConsumeKafkaRecordIT.java | 2 ++
.../apache/nifi/kafka/processors/ConsumeKafka.java | 3 ++-
.../AbstractRecordStreamKafkaMessageConverter.java | 19 ++++++++++++++++---
3 files changed, 20 insertions(+), 4 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordIT.java
index d7c3508e56..6e8b37d6fc 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordIT.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordIT.java
@@ -168,6 +168,8 @@ class ConsumeKafkaRecordIT extends AbstractConsumeKafkaIT {
flowFile.assertContentEquals(flowFileString);
flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_TOPIC,
topic);
flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_PARTITION,
Integer.toString(FIRST_PARTITION));
+
flowFile.assertAttributeExists(KafkaFlowFileAttribute.KAFKA_MAX_OFFSET);
+
flowFile.assertAttributeEquals("record.count",
Long.toString(TEST_RECORD_COUNT));
flowFile.assertAttributeEquals("aaa", "value");
flowFile.assertAttributeNotExists("bbb");
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index 4643a02733..ad1bd6d2c8 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -96,7 +96,8 @@ import static
org.apache.nifi.expression.ExpressionLanguageScope.NONE;
@WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TIMESTAMP,
description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_PARTITION,
description = "The partition of the topic the message or message bundle is
from"),
@WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC,
description = "The topic the message or message bundle is from"),
- @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE,
description = "Set to true if the consumed message is a tombstone message")
+ @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE,
description = "Set to true if the consumed message is a tombstone message"),
+ @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_MAX_OFFSET,
description = "The maximum value of the Kafka offset in batch of records")
})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@SeeAlso({PublishKafka.class})
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/AbstractRecordStreamKafkaMessageConverter.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/AbstractRecordStreamKafkaMessageConverter.java
index f0fce4a0d4..caed26f07b 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/AbstractRecordStreamKafkaMessageConverter.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/AbstractRecordStreamKafkaMessageConverter.java
@@ -47,6 +47,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
public abstract class AbstractRecordStreamKafkaMessageConverter implements
KafkaMessageConverter {
@@ -153,8 +154,16 @@ public abstract class
AbstractRecordStreamKafkaMessageConverter implements Kafka
throw ex;
}
- group = new RecordGroup(ff, writer);
+ final long offset = consumerRecord.getOffset();
+ final AtomicLong maxOffset = new AtomicLong(offset);
+ group = new RecordGroup(ff, writer, maxOffset);
recordGroups.put(criteria, group);
+ } else {
+ final long recordOffset = consumerRecord.getOffset();
+ final AtomicLong maxOffset = group.maxOffset();
+ if (recordOffset > maxOffset.get()) {
+ maxOffset.set(recordOffset);
+ }
}
// let subclass convert into the thing to write
@@ -176,6 +185,10 @@ public abstract class
AbstractRecordStreamKafkaMessageConverter implements Kafka
resultAttrs.putAll(wr.getAttributes());
resultAttrs.put("record.count",
String.valueOf(wr.getRecordCount()));
resultAttrs.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
+
+ final long maxOffset = group.maxOffset().get();
+ resultAttrs.put(KafkaFlowFileAttribute.KAFKA_MAX_OFFSET,
Long.toString(maxOffset));
+
// add any extra header‐derived attributes
resultAttrs.putAll(criteria.extraAttributes());
resultAttrs.put(KafkaFlowFileAttribute.KAFKA_CONSUMER_OFFSETS_COMMITTED,
String.valueOf(commitOffsets));
@@ -212,9 +225,9 @@ public abstract class
AbstractRecordStreamKafkaMessageConverter implements Kafka
protected abstract Record convertRecord(ByteRecord consumerRecord, Record
record, Map<String, String> attributes) throws IOException;
- private static record RecordGroupCriteria(RecordSchema schema, Map<String,
String> extraAttributes, String topic, int partition) {
+ private record RecordGroupCriteria(RecordSchema schema, Map<String,
String> extraAttributes, String topic, int partition) {
}
- private static record RecordGroup(FlowFile flowFile, RecordSetWriter
writer) {
+ private record RecordGroup(FlowFile flowFile, RecordSetWriter writer,
AtomicLong maxOffset) {
}
}