Pierre Villard created NIFI-15961:
-------------------------------------
Summary: Add Kafka Share Group support to ConsumeKafka
Key: NIFI-15961
URL: https://issues.apache.org/jira/browse/NIFI-15961
Project: Apache NiFi
Issue Type: Improvement
Components: Extensions
Reporter: Pierre Villard
Assignee: Pierre Villard
Extend the *ConsumeKafka* processor with optional support for Kafka share
groups
([KIP-932|https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka]).
Share groups distribute records cooperatively across the consumers of a group
with per-record acknowledgement, decoupling consumer parallelism from the
number of partitions on the subscribed topics.
*Proposed Change*
Add a new *Group Type* property to *ConsumeKafka* with values *Consumer Group*
(the default) and {*}Share Group{*}. When *Share Group* is selected:
* The processor uses *KafkaShareConsumer* via a new
*KafkaShareConsumerService* SPI on *KafkaConnectionService* (added as a default
method that throws *UnsupportedOperationException* so out-of-tree
implementations stay binary- and source-compatible).
* A new *Acknowledgement Mode* property ({*}Explicit{*} default, *Implicit*
opt-in) controls how records are acknowledged. In *Explicit* mode every record
is acknowledged individually; on session rollback records are {*}RELEASE{*}d
back to the share group for immediate redelivery. In *Implicit* mode the broker
treats all delivered records as *ACCEPT* on the next poll/commit; on rollback
the consumer is closed so the broker's record-acquisition lock can expire and
the records become eligible for redelivery.
* Classic-group properties (Topic Format, Auto Offset Reset, Commit Offsets)
are hidden when *Share Group* is selected because they have no analogue in the
share-group protocol. The starting position for a new share group is managed
out of band via *kafka-share-groups.sh --reset-offsets* or
{*}Admin.alterShareGroupOffsets{*}.
* Verification and topic sampling continue to use *Explicit* acknowledgement
internally so sampled records are released back to the share group, regardless
of the user-selected mode.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)